diff --git a/libs/utils/src/sync.rs b/libs/utils/src/sync.rs index 7aa26e24bc..280637de8f 100644 --- a/libs/utils/src/sync.rs +++ b/libs/utils/src/sync.rs @@ -1,5 +1,6 @@ pub mod heavier_once_cell; +pub mod duplex; pub mod gate; pub mod spsc_fold; diff --git a/libs/utils/src/sync/duplex.rs b/libs/utils/src/sync/duplex.rs new file mode 100644 index 0000000000..fac79297a0 --- /dev/null +++ b/libs/utils/src/sync/duplex.rs @@ -0,0 +1 @@ +pub mod mpsc; diff --git a/libs/utils/src/sync/duplex/mpsc.rs b/libs/utils/src/sync/duplex/mpsc.rs new file mode 100644 index 0000000000..56b4e6d2b3 --- /dev/null +++ b/libs/utils/src/sync/duplex/mpsc.rs @@ -0,0 +1,36 @@ +use tokio::sync::mpsc; + +/// A bi-directional channel. +pub struct Duplex { + pub tx: mpsc::Sender, + pub rx: mpsc::Receiver, +} + +/// Creates a bi-directional channel. +/// +/// The channel will buffer up to the provided number of messages. Once the buffer is full, +/// attempts to send new messages will wait until a message is received from the channel. +/// The provided buffer capacity must be at least 1. +pub fn channel(buffer: usize) -> (Duplex, Duplex) { + let (tx_a, rx_a) = mpsc::channel::(buffer); + let (tx_b, rx_b) = mpsc::channel::(buffer); + + (Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a }) +} + +impl Duplex { + /// Sends a value, waiting until there is capacity. + /// + /// A successful send occurs when it is determined that the other end of the channel has not hung up already. + pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError> { + self.tx.send(x).await + } + + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index caacd365b3..b67a9cc479 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -62,10 +62,8 @@ async fn ingest( let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let gate = utils::sync::gate::Gate::default(); - let entered = gate.enter().unwrap(); - let layer = - InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?; + let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?; let data = Value::Image(Bytes::from(vec![0u8; put_size])); let data_ser_size = data.serialized_size().unwrap() as usize; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index de0abab4c0..aaec8a4c31 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -8,10 +8,8 @@ use crate::page_cache; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; -use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile}; -use bytes::BytesMut; use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; @@ -20,6 +18,7 @@ use tracing::error; use std::io; use std::sync::atomic::AtomicU64; +use std::sync::Arc; use utils::id::TimelineId; pub struct EphemeralFile { @@ -27,10 +26,7 @@ pub struct EphemeralFile { _timeline_id: TimelineId, page_cache_file_id: page_cache::FileId, bytes_written: u64, - buffered_writer: owned_buffers_io::write::BufferedWriter< - BytesMut, - size_tracking_writer::Writer, - >, + buffered_writer: owned_buffers_io::write::BufferedWriter, /// Gate guard is held on as long as we need to do operations in the path (delete on drop) _gate_guard: utils::sync::gate::GateGuard, } @@ -42,9 +38,9 @@ impl EphemeralFile { conf: &PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, - ) -> Result { + ) -> anyhow::Result { static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); let filename_disambiguator = NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -55,15 +51,17 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = VirtualFile::open_with_options( - &filename, - virtual_file::OpenOptions::new() - .read(true) - .write(true) - .create(true), - ctx, - ) - .await?; + let file = Arc::new( + VirtualFile::open_with_options_v2( + &filename, + virtual_file::OpenOptions::new() + .read(true) + .write(true) + .create(true), + ctx, + ) + .await?, + ); let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore @@ -73,10 +71,12 @@ impl EphemeralFile { page_cache_file_id, bytes_written: 0, buffered_writer: owned_buffers_io::write::BufferedWriter::new( - size_tracking_writer::Writer::new(file), - BytesMut::with_capacity(TAIL_SZ), + file, + || IoBufferMut::with_capacity(TAIL_SZ), + gate.enter()?, + ctx, ), - _gate_guard: gate_guard, + _gate_guard: gate.enter()?, }) } } @@ -85,7 +85,7 @@ impl Drop for EphemeralFile { fn drop(&mut self) { // unlink the file // we are clear to do this, because we have entered a gate - let path = self.buffered_writer.as_inner().as_inner().path(); + let path = self.buffered_writer.as_inner().path(); let res = std::fs::remove_file(path); if let Err(e) = res { if e.kind() != std::io::ErrorKind::NotFound { @@ -132,6 +132,18 @@ impl EphemeralFile { srcbuf: &[u8], ctx: &RequestContext, ) -> std::io::Result { + let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?; + if let Some(control) = control { + control.release().await; + } + Ok(pos) + } + + async fn write_raw_controlled( + &mut self, + srcbuf: &[u8], + ctx: &RequestContext, + ) -> std::io::Result<(u64, Option)> { let pos = self.bytes_written; let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { @@ -145,9 +157,9 @@ impl EphemeralFile { })?; // Write the payload - let nwritten = self + let (nwritten, control) = self .buffered_writer - .write_buffered_borrowed(srcbuf, ctx) + .write_buffered_borrowed_controlled(srcbuf, ctx) .await?; assert_eq!( nwritten, @@ -157,7 +169,7 @@ impl EphemeralFile { self.bytes_written = new_bytes_written; - Ok(pos) + Ok((pos, control)) } } @@ -168,11 +180,12 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst: tokio_epoll_uring::Slice, ctx: &'a RequestContext, ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { - let file_size_tracking_writer = self.buffered_writer.as_inner(); - let flushed_offset = file_size_tracking_writer.bytes_written(); + let submitted_offset = self.buffered_writer.bytes_submitted(); - let buffer = self.buffered_writer.inspect_buffer(); - let buffered = &buffer[0..buffer.pending()]; + let mutable = self.buffered_writer.inspect_mutable(); + let mutable = &mutable[0..mutable.pending()]; + + let maybe_flushed = self.buffered_writer.inspect_maybe_flushed(); let dst_cap = dst.bytes_total().into_u64(); let end = { @@ -197,11 +210,42 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral } } } - let written_range = Range(start, std::cmp::min(end, flushed_offset)); - let buffered_range = Range(std::cmp::max(start, flushed_offset), end); + + let (written_range, maybe_flushed_range) = { + if maybe_flushed.is_some() { + // [ written ][ maybe_flushed ][ mutable ] + // <- TAIL_SZ -><- TAIL_SZ -> + // ^ + // `submitted_offset` + // <++++++ on disk +++++++????????????????> + ( + Range( + start, + std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)), + ), + Range( + std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)), + std::cmp::min(end, submitted_offset), + ), + ) + } else { + // [ written ][ mutable ] + // <- TAIL_SZ -> + // ^ + // `submitted_offset` + // <++++++ on disk +++++++++++++++++++++++> + ( + Range(start, std::cmp::min(end, submitted_offset)), + // zero len + Range(submitted_offset, u64::MIN), + ) + } + }; + + let mutable_range = Range(std::cmp::max(start, submitted_offset), end); let dst = if written_range.len() > 0 { - let file: &VirtualFile = file_size_tracking_writer.as_inner(); + let file: &VirtualFile = self.buffered_writer.as_inner(); let bounds = dst.bounds(); let slice = file .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) @@ -211,19 +255,21 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst }; - let dst = if buffered_range.len() > 0 { - let offset_in_buffer = buffered_range + let dst = if maybe_flushed_range.len() > 0 { + let offset_in_buffer = maybe_flushed_range .0 - .checked_sub(flushed_offset) + .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64)) .unwrap() .into_usize(); - let to_copy = - &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())]; + // Checked previously the buffer is Some. + let maybe_flushed = maybe_flushed.unwrap(); + let to_copy = &maybe_flushed + [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())]; let bounds = dst.bounds(); let mut view = dst.slice({ let start = written_range.len().into_usize(); let end = start - .checked_add(buffered_range.len().into_usize()) + .checked_add(maybe_flushed_range.len().into_usize()) .unwrap(); start..end }); @@ -234,6 +280,28 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst }; + let dst = if mutable_range.len() > 0 { + let offset_in_buffer = mutable_range + .0 + .checked_sub(submitted_offset) + .unwrap() + .into_usize(); + let to_copy = + &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; + let bounds = dst.bounds(); + let mut view = dst.slice({ + let start = + written_range.len().into_usize() + maybe_flushed_range.len().into_usize(); + let end = start.checked_add(mutable_range.len().into_usize()).unwrap(); + start..end + }); + view.as_mut_rust_slice_full_zeroed() + .copy_from_slice(to_copy); + Slice::from_buf_bounds(Slice::into_inner(view), bounds) + } else { + dst + }; + // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs Ok((dst, (end - start).into_usize())) @@ -295,7 +363,7 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) .await .unwrap(); @@ -326,14 +394,15 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); - let cap = file.buffered_writer.inspect_buffer().capacity(); + let mutable = file.buffered_writer.inspect_mutable(); + let cap = mutable.capacity(); + let align = mutable.align(); - let write_nbytes = cap + cap / 2; + let write_nbytes = cap * 2 + cap / 2; let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -341,30 +410,39 @@ mod tests { .collect(); let mut value_offsets = Vec::new(); - for i in 0..write_nbytes { - let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap(); + for range in (0..write_nbytes) + .step_by(align) + .map(|start| start..(start + align).min(write_nbytes)) + { + let off = file.write_raw(&content[range], &ctx).await.unwrap(); value_offsets.push(off); } - assert!(file.len() as usize == write_nbytes); - for i in 0..write_nbytes { - assert_eq!(value_offsets[i], i.into_u64()); - let buf = IoBufferMut::with_capacity(1); + assert_eq!(file.len() as usize, write_nbytes); + for (i, range) in (0..write_nbytes) + .step_by(align) + .map(|start| start..(start + align).min(write_nbytes)) + .enumerate() + { + assert_eq!(value_offsets[i], range.start.into_u64()); + let buf = IoBufferMut::with_capacity(range.len()); let (buf_slice, nread) = file - .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) + .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx) .await .unwrap(); let buf = buf_slice.into_inner(); - assert_eq!(nread, 1); - assert_eq!(&buf, &content[i..i + 1]); + assert_eq!(nread, range.len()); + assert_eq!(&buf, &content[range]); } - let file_contents = - std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap(); - assert_eq!(file_contents, &content[0..cap]); + let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap(); + assert!(file_contents == content[0..cap * 2]); - let buffer_contents = file.buffered_writer.inspect_buffer(); - assert_eq!(buffer_contents, &content[cap..write_nbytes]); + let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); + assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]); + + let mutable_buffer_contents = file.buffered_writer.inspect_mutable(); + assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]); } #[tokio::test] @@ -373,16 +451,16 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); - let cap = file.buffered_writer.inspect_buffer().capacity(); + // mutable buffer and maybe_flushed buffer each has `cap` bytes. + let cap = file.buffered_writer.inspect_mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) - .take(cap + cap / 2) + .take(cap * 2 + cap / 2) .collect(); file.write_raw(&content, &ctx).await.unwrap(); @@ -390,23 +468,21 @@ mod tests { // assert the state is as this test expects it to be assert_eq!( &file.load_to_io_buf(&ctx).await.unwrap(), - &content[0..cap + cap / 2] + &content[0..cap * 2 + cap / 2] ); - let md = file - .buffered_writer - .as_inner() - .as_inner() - .path() - .metadata() - .unwrap(); + let md = file.buffered_writer.as_inner().path().metadata().unwrap(); assert_eq!( md.len(), - cap.into_u64(), - "buffered writer does one write if we write 1.5x buffer capacity" + 2 * cap.into_u64(), + "buffered writer requires one write to be flushed if we write 2.5x buffer capacity" ); assert_eq!( - &file.buffered_writer.inspect_buffer()[0..cap / 2], - &content[cap..cap + cap / 2] + &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap], + &content[cap..cap * 2] + ); + assert_eq!( + &file.buffered_writer.inspect_mutable()[0..cap / 2], + &content[cap * 2..cap * 2 + cap / 2] ); } @@ -422,19 +498,19 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); - - let cap = file.buffered_writer.inspect_buffer().capacity(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); + let mutable = file.buffered_writer.inspect_mutable(); + let cap = mutable.capacity(); + let align = mutable.align(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) - .take(cap + cap / 2) + .take(cap * 2 + cap / 2) .collect(); - file.write_raw(&content, &ctx).await.unwrap(); + let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap(); let test_read = |start: usize, len: usize| { let file = &file; @@ -454,16 +530,38 @@ mod tests { } }; + let test_read_all_offset_combinations = || { + async move { + test_read(align, align).await; + // border onto edge of file + test_read(cap - align, align).await; + // read across file and buffer + test_read(cap - align, 2 * align).await; + // stay from start of maybe flushed buffer + test_read(cap, align).await; + // completely within maybe flushed buffer + test_read(cap + align, align).await; + // border onto edge of maybe flushed buffer. + test_read(cap * 2 - align, align).await; + // read across maybe flushed and mutable buffer + test_read(cap * 2 - align, 2 * align).await; + // read across three segments + test_read(cap - align, cap + 2 * align).await; + // completely within mutable buffer + test_read(cap * 2 + align, align).await; + } + }; + // completely within the file range - assert!(20 < cap, "test assumption"); - test_read(10, 10).await; - // border onto edge of file - test_read(cap - 10, 10).await; - // read across file and buffer - test_read(cap - 10, 20).await; - // stay from start of buffer - test_read(cap, 10).await; - // completely within buffer - test_read(cap + 10, 10).await; + assert!(align < cap, "test assumption"); + assert!(cap % align == 0); + + // test reads at different flush stages. + let not_started = control.unwrap().into_not_started(); + test_read_all_offset_combinations().await; + let in_progress = not_started.ready_to_flush(); + test_read_all_offset_combinations().await; + in_progress.wait_until_flush_is_done().await; + test_read_all_offset_combinations().await; } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 4bb1bbf3cf..89b935947d 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -681,6 +681,7 @@ impl RemoteTimelineClient { layer_file_name: &LayerName, layer_metadata: &LayerFileMetadata, local_path: &Utf8Path, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { @@ -700,6 +701,7 @@ impl RemoteTimelineClient { layer_file_name, layer_metadata, local_path, + gate, cancel, ctx, ) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 739615be9c..c5ae466f3a 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -6,6 +6,7 @@ use std::collections::HashSet; use std::future::Future; use std::str::FromStr; +use std::sync::Arc; use std::time::SystemTime; use anyhow::{anyhow, Context}; @@ -26,9 +27,7 @@ use crate::span::{ use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; -#[cfg_attr(target_os = "macos", allow(unused_imports))] -use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; use remote_storage::{ DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, @@ -60,6 +59,7 @@ pub async fn download_layer_file<'a>( layer_file_name: &'a LayerName, layer_metadata: &'a LayerFileMetadata, local_path: &Utf8Path, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { @@ -88,7 +88,9 @@ pub async fn download_layer_file<'a>( let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION); let bytes_amount = download_retry( - || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await }, + || async { + download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await + }, &format!("download {remote_path:?}"), cancel, ) @@ -148,6 +150,7 @@ async fn download_object<'a>( storage: &'a GenericRemoteStorage, src_path: &RemotePath, dst_path: &Utf8PathBuf, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, ) -> Result { @@ -205,13 +208,16 @@ async fn download_object<'a>( } #[cfg(target_os = "linux")] crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { - use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer}; - use bytes::BytesMut; + use crate::virtual_file::owned_buffers_io; async { - let destination_file = VirtualFile::create(dst_path, ctx) - .await - .with_context(|| format!("create a destination file for layer '{dst_path}'")) - .map_err(DownloadError::Other)?; + let destination_file = Arc::new( + VirtualFile::create(dst_path, ctx) + .await + .with_context(|| { + format!("create a destination file for layer '{dst_path}'") + }) + .map_err(DownloadError::Other)?, + ); let mut download = storage .download(src_path, &DownloadOpts::default(), cancel) @@ -219,14 +225,16 @@ async fn download_object<'a>( pausable_failpoint!("before-downloading-layer-stream-pausable"); + let mut buffered = owned_buffers_io::write::BufferedWriter::::new( + destination_file, + || IoBufferMut::with_capacity(super::BUFFER_SIZE), + gate.enter().map_err(|_| DownloadError::Cancelled)?, + ctx, + ); + // TODO: use vectored write (writev) once supported by tokio-epoll-uring. // There's chunks_vectored() on the stream. let (bytes_amount, destination_file) = async { - let size_tracking = size_tracking_writer::Writer::new(destination_file); - let mut buffered = owned_buffers_io::write::BufferedWriter::::new( - size_tracking, - BytesMut::with_capacity(super::BUFFER_SIZE), - ); while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await { @@ -234,10 +242,10 @@ async fn download_object<'a>( Ok(chunk) => chunk, Err(e) => return Err(e), }; - buffered.write_buffered(chunk.slice_len(), ctx).await?; + buffered.write_buffered_borrowed(&chunk, ctx).await?; } - let size_tracking = buffered.flush_and_into_inner(ctx).await?; - Ok(size_tracking.into_inner()) + let inner = buffered.flush_and_into_inner(ctx).await?; + Ok(inner) } .await?; diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 701e4cf04b..395e34e404 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1183,6 +1183,7 @@ impl<'a> TenantDownloader<'a> { &layer.name, &layer.metadata, &local_path, + &self.secondary_state.gate, &self.secondary_state.cancel, ctx, ) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index af6112d535..71e53da20f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -555,13 +555,12 @@ impl InMemoryLayer { timeline_id: TimelineId, tenant_shard_id: TenantShardId, start_lsn: Lsn, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = - EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?; + let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?; let key = InMemoryLayerFileId(file.page_cache_file_id()); Ok(InMemoryLayer { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index a9f1189b41..8933e8ceb1 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1149,6 +1149,7 @@ impl LayerInner { &self.desc.layer_name(), &self.metadata(), &self.path, + &timeline.gate, &timeline.cancel, ctx, ) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1414bef0a5..fc741826ab 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3455,7 +3455,6 @@ impl Timeline { ctx: &RequestContext, ) -> anyhow::Result> { let mut guard = self.layers.write().await; - let gate_guard = self.gate.enter().context("enter gate for inmem layer")?; let last_record_lsn = self.get_last_record_lsn(); ensure!( @@ -3472,7 +3471,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - gate_guard, + &self.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 4293a44dca..3888e7f86a 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -182,7 +182,7 @@ impl OpenLayerManager { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result> { ensure!(lsn.is_aligned()); @@ -212,15 +212,9 @@ impl OpenLayerManager { lsn ); - let new_layer = InMemoryLayer::create( - conf, - timeline_id, - tenant_shard_id, - start_lsn, - gate_guard, - ctx, - ) - .await?; + let new_layer = + InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx) + .await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index b9f8c7ea20..8a7f4a4bf5 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -20,7 +20,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer; use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign}; -use owned_buffers_io::io_buf_aligned::IoBufAlignedMut; +use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut}; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::shard::TenantShardId; @@ -63,9 +63,6 @@ pub(crate) mod owned_buffers_io { pub(crate) mod io_buf_ext; pub(crate) mod slice; pub(crate) mod write; - pub(crate) mod util { - pub(crate) mod size_tracking_writer; - } } #[derive(Debug)] @@ -221,7 +218,7 @@ impl VirtualFile { self.inner.read_exact_at_page(page, offset, ctx).await } - pub async fn write_all_at( + pub async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -1325,14 +1322,14 @@ impl Drop for VirtualFileInner { } impl OwnedAsyncWriter for VirtualFile { - #[inline(always)] - async fn write_all( - &mut self, + async fn write_all_at( + &self, buf: FullSlice, + offset: u64, ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; - res.map(move |v| (v, buf)) + ) -> std::io::Result> { + let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; + res.map(|_| buf) } } @@ -1451,7 +1448,7 @@ mod tests { } } } - async fn write_all_at( + async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -1594,6 +1591,7 @@ mod tests { &ctx, ) .await?; + file_a .write_all(b"foobar".to_vec().slice_len(), &ctx) .await?; @@ -1652,10 +1650,10 @@ mod tests { ) .await?; file_b - .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx) + .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx) .await?; file_b - .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx) + .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx) .await?; assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs index 933b78a13b..6b9992643f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs @@ -4,7 +4,7 @@ pub trait Alignment: std::marker::Unpin + 'static { } /// Alignment at compile time. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct ConstAlign; impl Alignment for ConstAlign { @@ -14,7 +14,7 @@ impl Alignment for ConstAlign { } /// Alignment at run time. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct RuntimeAlign { align: usize, } diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs index 2fba6d699b..a5c26cd746 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs @@ -3,9 +3,10 @@ use std::{ sync::Arc, }; -use super::{alignment::Alignment, raw::RawAlignedBuffer}; +use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut, ConstAlign}; /// An shared, immutable aligned buffer type. +#[derive(Clone, Debug)] pub struct AlignedBuffer { /// Shared raw buffer. raw: Arc>, @@ -86,6 +87,13 @@ impl AlignedBuffer { range: begin..end, } } + + /// Returns the mutable aligned buffer, if the immutable aligned buffer + /// has exactly one strong reference. Otherwise returns `None`. + pub fn into_mut(self) -> Option> { + let raw = Arc::into_inner(self.raw)?; + Some(AlignedBufferMut::from_raw(raw)) + } } impl Deref for AlignedBuffer { @@ -108,6 +116,14 @@ impl PartialEq<[u8]> for AlignedBuffer { } } +impl From<&[u8; N]> for AlignedBuffer> { + fn from(value: &[u8; N]) -> Self { + let mut buf = AlignedBufferMut::with_capacity(N); + buf.extend_from_slice(value); + buf.freeze() + } +} + /// SAFETY: the underlying buffer references a stable memory region. unsafe impl tokio_epoll_uring::IoBuf for AlignedBuffer { fn stable_ptr(&self) -> *const u8 { diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index b3675d1aea..d2f5e206bb 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -1,4 +1,7 @@ -use std::ops::{Deref, DerefMut}; +use std::{ + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; use super::{ alignment::{Alignment, ConstAlign}, @@ -46,6 +49,11 @@ impl AlignedBufferMut> { } impl AlignedBufferMut { + /// Constructs a mutable aligned buffer from raw. + pub(super) fn from_raw(raw: RawAlignedBuffer) -> Self { + AlignedBufferMut { raw } + } + /// Returns the total number of bytes the buffer can hold. #[inline] pub fn capacity(&self) -> usize { @@ -128,6 +136,39 @@ impl AlignedBufferMut { let len = self.len(); AlignedBuffer::from_raw(self.raw, 0..len) } + + /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed. + #[inline] + pub fn extend_from_slice(&mut self, extend: &[u8]) { + let cnt = extend.len(); + self.reserve(cnt); + + // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy. + unsafe { + let dst = self.spare_capacity_mut(); + // Reserved above + debug_assert!(dst.len() >= cnt); + + core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt); + } + // SAFETY: We do have at least `cnt` bytes remaining before advance. + unsafe { + bytes::BufMut::advance_mut(self, cnt); + } + } + + /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit`. + #[inline] + fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit] { + // SAFETY: we guarantees that the `Self::capacity()` bytes from + // `Self::as_mut_ptr()` are allocated. + unsafe { + let ptr = self.as_mut_ptr().add(self.len()); + let len = self.capacity() - self.len(); + + core::slice::from_raw_parts_mut(ptr.cast(), len) + } + } } impl Deref for AlignedBufferMut { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index dba695196e..4ea6b17744 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -1,9 +1,15 @@ -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::{IoBuf, IoBufMut}; -use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf}; +use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf}; +/// A marker trait for a mutable aligned buffer type. pub trait IoBufAlignedMut: IoBufMut {} +/// A marker trait for an aligned buffer type. +pub trait IoBufAligned: IoBuf {} + impl IoBufAlignedMut for IoBufferMut {} +impl IoBufAligned for IoBuffer {} + impl IoBufAlignedMut for PageWriteGuardBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index c3940cf6ce..525f447b6d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -5,6 +5,8 @@ use bytes::{Bytes, BytesMut}; use std::ops::{Deref, Range}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use super::write::CheapCloneForRead; + /// The true owned equivalent for Rust [`slice`]. Use this for the write path. /// /// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`, @@ -43,6 +45,18 @@ where } } +impl CheapCloneForRead for FullSlice +where + B: IoBuf + CheapCloneForRead, +{ + fn cheap_clone(&self) -> Self { + let bounds = self.slice.bounds(); + let clone = self.slice.get_ref().cheap_clone(); + let slice = clone.slice(bounds); + Self { slice } + } +} + pub(crate) trait IoBufExt { /// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`. fn slice_len(self) -> FullSlice diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs deleted file mode 100644 index efcb61ba65..0000000000 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::{ - context::RequestContext, - virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter}, -}; -use tokio_epoll_uring::IoBuf; - -pub struct Writer { - dst: W, - bytes_amount: u64, -} - -impl Writer { - pub fn new(dst: W) -> Self { - Self { - dst, - bytes_amount: 0, - } - } - - pub fn bytes_written(&self) -> u64 { - self.bytes_amount - } - - pub fn as_inner(&self) -> &W { - &self.dst - } - - /// Returns the wrapped `VirtualFile` object as well as the number - /// of bytes that were written to it through this object. - #[cfg_attr(target_os = "macos", allow(dead_code))] - pub fn into_inner(self) -> (u64, W) { - (self.bytes_amount, self.dst) - } -} - -impl OwnedAsyncWriter for Writer -where - W: OwnedAsyncWriter, -{ - #[inline(always)] - async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; - self.bytes_amount += u64::try_from(nwritten).unwrap(); - Ok((nwritten, buf)) - } -} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 568cf62e56..20bf878123 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,55 +1,88 @@ -use bytes::BytesMut; +mod flush; +use std::sync::Arc; + +use flush::FlushHandle; use tokio_epoll_uring::IoBuf; -use crate::context::RequestContext; +use crate::{ + context::RequestContext, + virtual_file::{IoBuffer, IoBufferMut}, +}; -use super::io_buf_ext::{FullSlice, IoBufExt}; +use super::{ + io_buf_aligned::IoBufAligned, + io_buf_ext::{FullSlice, IoBufExt}, +}; + +pub(crate) use flush::FlushControl; + +pub(crate) trait CheapCloneForRead { + /// Returns a cheap clone of the buffer. + fn cheap_clone(&self) -> Self; +} + +impl CheapCloneForRead for IoBuffer { + fn cheap_clone(&self) -> Self { + // Cheap clone over an `Arc`. + self.clone() + } +} /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. +/// The owned buffers need to be aligned due to Direct IO requirements. pub trait OwnedAsyncWriter { - async fn write_all( - &mut self, + fn write_all_at( + &self, buf: FullSlice, + offset: u64, ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)>; + ) -> impl std::future::Future>> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch /// small writes into larger writes of size [`Buffer::cap`]. -/// -/// # Passthrough Of Large Writers -/// -/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`] -/// cause the internal buffer to be flushed prematurely so that the large -/// buffered write is passed through to the underlying [`OwnedAsyncWriter`]. -/// -/// This pass-through is generally beneficial for throughput, but if -/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource, -/// unlimited large writes may cause latency or fairness issues. -/// -/// In such cases, a different implementation that always buffers in memory -/// may be preferable. -pub struct BufferedWriter { - writer: W, +// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput, +// since we would avoid copying majority of the data into the internal buffer. +pub struct BufferedWriter { + writer: Arc, /// invariant: always remains Some(buf) except /// - while IO is ongoing => goes back to Some() once the IO completed successfully /// - after an IO error => stays `None` forever /// /// In these exceptional cases, it's `None`. - buf: Option, + mutable: Option, + /// A handle to the background flush task for writting data to disk. + flush_handle: FlushHandle, + /// The number of bytes submitted to the background task. + bytes_submitted: u64, } impl BufferedWriter where - B: Buffer + Send, - Buf: IoBuf + Send, - W: OwnedAsyncWriter, + B: Buffer + Send + 'static, + Buf: IoBufAligned + Send + Sync + CheapCloneForRead, + W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { - pub fn new(writer: W, buf: B) -> Self { + /// Creates a new buffered writer. + /// + /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. + pub fn new( + writer: Arc, + buf_new: impl Fn() -> B, + gate_guard: utils::sync::gate::GateGuard, + ctx: &RequestContext, + ) -> Self { Self { - writer, - buf: Some(buf), + writer: writer.clone(), + mutable: Some(buf_new()), + flush_handle: FlushHandle::spawn_new( + writer, + buf_new(), + gate_guard, + ctx.attached_child(), + ), + bytes_submitted: 0, } } @@ -57,87 +90,70 @@ where &self.writer } + /// Returns the number of bytes submitted to the background flush task. + pub fn bytes_submitted(&self) -> u64 { + self.bytes_submitted + } + /// Panics if used after any of the write paths returned an error - pub fn inspect_buffer(&self) -> &B { - self.buf() + pub fn inspect_mutable(&self) -> &B { + self.mutable() + } + + /// Gets a reference to the maybe flushed read-only buffer. + /// Returns `None` if the writer has not submitted any flush request. + pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice> { + self.flush_handle.maybe_flushed.as_ref() } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result { + pub async fn flush_and_into_inner( + mut self, + ctx: &RequestContext, + ) -> std::io::Result<(u64, Arc)> { self.flush(ctx).await?; - let Self { buf, writer } = self; + let Self { + mutable: buf, + writer, + mut flush_handle, + bytes_submitted: bytes_amount, + } = self; + flush_handle.shutdown().await?; assert!(buf.is_some()); - Ok(writer) + Ok((bytes_amount, writer)) } + /// Gets a reference to the mutable in-memory buffer. #[inline(always)] - fn buf(&self) -> &B { - self.buf + fn mutable(&self) -> &B { + self.mutable .as_ref() .expect("must not use after we returned an error") } - /// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted. - #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn write_buffered( + pub async fn write_buffered_borrowed( &mut self, - chunk: FullSlice, + chunk: &[u8], ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let chunk = chunk.into_raw_slice(); - - let chunk_len = chunk.len(); - // avoid memcpy for the middle of the chunk - if chunk.len() >= self.buf().cap() { - self.flush(ctx).await?; - // do a big write, bypassing `buf` - assert_eq!( - self.buf - .as_ref() - .expect("must not use after an error") - .pending(), - 0 - ); - let (nwritten, chunk) = self - .writer - .write_all(FullSlice::must_new(chunk), ctx) - .await?; - assert_eq!(nwritten, chunk_len); - return Ok((nwritten, chunk)); + ) -> std::io::Result { + let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?; + if let Some(control) = control { + control.release().await; } - // in-memory copy the < BUFFER_SIZED tail of the chunk - assert!(chunk.len() < self.buf().cap()); - let mut slice = &chunk[..]; - while !slice.is_empty() { - let buf = self.buf.as_mut().expect("must not use after an error"); - let need = buf.cap() - buf.pending(); - let have = slice.len(); - let n = std::cmp::min(need, have); - buf.extend_from_slice(&slice[..n]); - slice = &slice[n..]; - if buf.pending() >= buf.cap() { - assert_eq!(buf.pending(), buf.cap()); - self.flush(ctx).await?; - } - } - assert!(slice.is_empty(), "by now we should have drained the chunk"); - Ok((chunk_len, FullSlice::must_new(chunk))) + Ok(len) } - /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data. - /// - /// It is less performant because we always have to copy the borrowed data into the internal buffer - /// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant - /// for large writes. - pub async fn write_buffered_borrowed( + /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior. + pub(crate) async fn write_buffered_borrowed_controlled( &mut self, mut chunk: &[u8], ctx: &RequestContext, - ) -> std::io::Result { + ) -> std::io::Result<(usize, Option)> { let chunk_len = chunk.len(); + let mut control: Option = None; while !chunk.is_empty() { - let buf = self.buf.as_mut().expect("must not use after an error"); + let buf = self.mutable.as_mut().expect("must not use after an error"); let need = buf.cap() - buf.pending(); let have = chunk.len(); let n = std::cmp::min(need, have); @@ -145,26 +161,27 @@ where chunk = &chunk[n..]; if buf.pending() >= buf.cap() { assert_eq!(buf.pending(), buf.cap()); - self.flush(ctx).await?; + if let Some(control) = control.take() { + control.release().await; + } + control = self.flush(ctx).await?; } } - Ok(chunk_len) + Ok((chunk_len, control)) } - async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> { - let buf = self.buf.take().expect("must not use after an error"); + #[must_use = "caller must explcitly check the flush control"] + async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result> { + let buf = self.mutable.take().expect("must not use after an error"); let buf_len = buf.pending(); if buf_len == 0 { - self.buf = Some(buf); - return Ok(()); + self.mutable = Some(buf); + return Ok(None); } - let slice = buf.flush(); - let (nwritten, slice) = self.writer.write_all(slice, ctx).await?; - assert_eq!(nwritten, buf_len); - self.buf = Some(Buffer::reuse_after_flush( - slice.into_raw_slice().into_inner(), - )); - Ok(()) + let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?; + self.bytes_submitted += u64::try_from(buf_len).unwrap(); + self.mutable = Some(recycled); + Ok(Some(flush_control)) } } @@ -192,64 +209,77 @@ pub trait Buffer { fn reuse_after_flush(iobuf: Self::IoBuf) -> Self; } -impl Buffer for BytesMut { - type IoBuf = BytesMut; +impl Buffer for IoBufferMut { + type IoBuf = IoBuffer; - #[inline(always)] fn cap(&self) -> usize { self.capacity() } fn extend_from_slice(&mut self, other: &[u8]) { - BytesMut::extend_from_slice(self, other) + if self.len() + other.len() > self.cap() { + panic!("Buffer capacity exceeded"); + } + + IoBufferMut::extend_from_slice(self, other); } - #[inline(always)] fn pending(&self) -> usize { self.len() } - fn flush(self) -> FullSlice { - self.slice_len() + fn flush(self) -> FullSlice { + self.freeze().slice_len() } - fn reuse_after_flush(mut iobuf: BytesMut) -> Self { - iobuf.clear(); - iobuf - } -} - -impl OwnedAsyncWriter for Vec { - async fn write_all( - &mut self, - buf: FullSlice, - _: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - self.extend_from_slice(&buf[..]); - Ok((buf.len(), buf)) + /// Caller should make sure that `iobuf` only have one strong reference before invoking this method. + fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { + let mut recycled = iobuf + .into_mut() + .expect("buffer should only have one strong reference"); + recycled.clear(); + recycled } } #[cfg(test)] mod tests { - use bytes::BytesMut; + use std::sync::Mutex; use super::*; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::TaskKind; - #[derive(Default)] + #[derive(Default, Debug)] struct RecorderWriter { - writes: Vec>, + /// record bytes and write offsets. + writes: Mutex, u64)>>, } + + impl RecorderWriter { + /// Gets recorded bytes and write offsets. + fn get_writes(&self) -> Vec> { + self.writes + .lock() + .unwrap() + .iter() + .map(|(buf, _)| buf.clone()) + .collect() + } + } + impl OwnedAsyncWriter for RecorderWriter { - async fn write_all( - &mut self, + async fn write_all_at( + &self, buf: FullSlice, + offset: u64, _: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - self.writes.push(Vec::from(&buf[..])); - Ok((buf.len(), buf)) + ) -> std::io::Result> { + self.writes + .lock() + .unwrap() + .push((Vec::from(&buf[..]), offset)); + Ok(buf) } } @@ -257,71 +287,21 @@ mod tests { RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) } - macro_rules! write { - ($writer:ident, $data:literal) => {{ - $writer - .write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx()) - .await?; - }}; - } - #[tokio::test] - async fn test_buffered_writes_only() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); - write!(writer, b"a"); - write!(writer, b"b"); - write!(writer, b"c"); - write!(writer, b"d"); - write!(writer, b"e"); - let recorder = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.writes, - vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")] - ); - Ok(()) - } - - #[tokio::test] - async fn test_passthrough_writes_only() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); - write!(writer, b"abc"); - write!(writer, b"de"); - write!(writer, b""); - write!(writer, b"fghijk"); - let recorder = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.writes, - vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")] - ); - Ok(()) - } - - #[tokio::test] - async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); - write!(writer, b"a"); - write!(writer, b"bc"); - write!(writer, b"d"); - write!(writer, b"e"); - let recorder = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.writes, - vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")] - ); - Ok(()) - } - - #[tokio::test] - async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> { + async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> { let ctx = test_ctx(); let ctx = &ctx; - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); + let recorder = Arc::new(RecorderWriter::default()); + let gate = utils::sync::gate::Gate::default(); + let mut writer = BufferedWriter::<_, RecorderWriter>::new( + recorder, + || IoBufferMut::with_capacity(2), + gate.enter()?, + ctx, + ); writer.write_buffered_borrowed(b"abc", ctx).await?; + writer.write_buffered_borrowed(b"", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?; writer.write_buffered_borrowed(b"e", ctx).await?; writer.write_buffered_borrowed(b"fg", ctx).await?; @@ -329,9 +309,9 @@ mod tests { writer.write_buffered_borrowed(b"j", ctx).await?; writer.write_buffered_borrowed(b"klmno", ctx).await?; - let recorder = writer.flush_and_into_inner(ctx).await?; + let (_, recorder) = writer.flush_and_into_inner(ctx).await?; assert_eq!( - recorder.writes, + recorder.get_writes(), { let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"]; expect diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs new file mode 100644 index 0000000000..9ce8b311bb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -0,0 +1,314 @@ +use std::sync::Arc; + +use utils::sync::duplex; + +use crate::{ + context::RequestContext, + virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice}, +}; + +use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; + +/// A handle to the flush task. +pub struct FlushHandle { + inner: Option>, + /// Immutable buffer for serving tail reads. + /// `None` if no flush request has been submitted. + pub(super) maybe_flushed: Option>, +} + +pub struct FlushHandleInner { + /// A bi-directional channel that sends (buffer, offset) for writes, + /// and receives recyled buffer. + channel: duplex::mpsc::Duplex, FullSlice>, + /// Join handle for the background flush task. + join_handle: tokio::task::JoinHandle>>, +} + +struct FlushRequest { + slice: FullSlice, + offset: u64, + #[cfg(test)] + ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>, + #[cfg(test)] + done_flush_tx: tokio::sync::oneshot::Sender<()>, +} + +/// Constructs a request and a control object for a new flush operation. +#[cfg(not(test))] +fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, FlushControl) { + let request = FlushRequest { slice, offset }; + let control = FlushControl::untracked(); + + (request, control) +} + +/// Constructs a request and a control object for a new flush operation. +#[cfg(test)] +fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, FlushControl) { + let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel(); + let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel(); + let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx); + + let request = FlushRequest { + slice, + offset, + ready_to_flush_rx, + done_flush_tx, + }; + (request, control) +} + +/// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior. +#[cfg(test)] +pub(crate) struct FlushControl { + not_started: FlushNotStarted, +} + +#[cfg(not(test))] +pub(crate) struct FlushControl; + +impl FlushControl { + #[cfg(test)] + fn not_started( + ready_to_flush_tx: tokio::sync::oneshot::Sender<()>, + done_flush_rx: tokio::sync::oneshot::Receiver<()>, + ) -> Self { + FlushControl { + not_started: FlushNotStarted { + ready_to_flush_tx, + done_flush_rx, + }, + } + } + + #[cfg(not(test))] + fn untracked() -> Self { + FlushControl + } + + /// In tests, turn flush control into a not started state. + #[cfg(test)] + pub(crate) fn into_not_started(self) -> FlushNotStarted { + self.not_started + } + + /// Release control to the submitted buffer. + /// + /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution. + pub async fn release(self) { + #[cfg(test)] + { + self.not_started + .ready_to_flush() + .wait_until_flush_is_done() + .await; + } + } +} + +impl FlushHandle +where + Buf: IoBufAligned + Send + Sync + CheapCloneForRead, + W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, +{ + /// Spawns a new background flush task and obtains a handle. + /// + /// Note: The background task so we do not need to explicitly maintain a queue of buffers. + pub fn spawn_new( + file: Arc, + buf: B, + gate_guard: utils::sync::gate::GateGuard, + ctx: RequestContext, + ) -> Self + where + B: Buffer + Send + 'static, + { + // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. + let (front, back) = duplex::mpsc::channel(1); + + let join_handle = tokio::spawn(async move { + FlushBackgroundTask::new(back, file, gate_guard, ctx) + .run(buf.flush()) + .await + }); + + FlushHandle { + inner: Some(FlushHandleInner { + channel: front, + join_handle, + }), + maybe_flushed: None, + } + } + + /// Submits a buffer to be flushed in the background task. + /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged. + /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise + /// clear `maybe_flushed`. + pub async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)> + where + B: Buffer + Send + 'static, + { + let slice = buf.flush(); + + // Saves a buffer for read while flushing. This also removes reference to the old buffer. + self.maybe_flushed = Some(slice.cheap_clone()); + + let (request, flush_control) = new_flush_op(slice, offset); + + // Submits the buffer to the background task. + let submit = self.inner_mut().channel.send(request).await; + if submit.is_err() { + return self.handle_error().await; + } + + // Wait for an available buffer from the background flush task. + // This is the BACKPRESSURE mechanism: if the flush task can't keep up, + // then the write path will eventually wait for it here. + let Some(recycled) = self.inner_mut().channel.recv().await else { + return self.handle_error().await; + }; + + // The only other place that could hold a reference to the recycled buffer + // is in `Self::maybe_flushed`, but we have already replace it with the new buffer. + let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner()); + Ok((recycled, flush_control)) + } + + async fn handle_error(&mut self) -> std::io::Result { + Err(self + .shutdown() + .await + .expect_err("flush task only disconnects duplex if it exits with an error")) + } + + /// Cleans up the channel, join the flush task. + pub async fn shutdown(&mut self) -> std::io::Result> { + let handle = self + .inner + .take() + .expect("must not use after we returned an error"); + drop(handle.channel.tx); + handle.join_handle.await.unwrap() + } + + /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`. + /// This only happens if the handle is used after an error. + fn inner_mut(&mut self) -> &mut FlushHandleInner { + self.inner + .as_mut() + .expect("must not use after we returned an error") + } +} + +/// A background task for flushing data to disk. +pub struct FlushBackgroundTask { + /// A bi-directional channel that receives (buffer, offset) for writes, + /// and send back recycled buffer. + channel: duplex::mpsc::Duplex, FlushRequest>, + /// A writter for persisting data to disk. + writer: Arc, + ctx: RequestContext, + /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk. + _gate_guard: utils::sync::gate::GateGuard, +} + +impl FlushBackgroundTask +where + Buf: IoBufAligned + Send + Sync, + W: OwnedAsyncWriter + Sync + 'static, +{ + /// Creates a new background flush task. + fn new( + channel: duplex::mpsc::Duplex, FlushRequest>, + file: Arc, + gate_guard: utils::sync::gate::GateGuard, + ctx: RequestContext, + ) -> Self { + FlushBackgroundTask { + channel, + writer: file, + _gate_guard: gate_guard, + ctx, + } + } + + /// Runs the background flush task. + /// The passed in slice is immediately sent back to the flush handle through the duplex channel. + async fn run(mut self, slice: FullSlice) -> std::io::Result> { + // Sends the extra buffer back to the handle. + self.channel.send(slice).await.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") + })?; + + // Exit condition: channel is closed and there is no remaining buffer to be flushed + while let Some(request) = self.channel.recv().await { + #[cfg(test)] + { + // In test, wait for control to signal that we are ready to flush. + if request.ready_to_flush_rx.await.is_err() { + tracing::debug!("control dropped"); + } + } + + // Write slice to disk at `offset`. + let slice = self + .writer + .write_all_at(request.slice, request.offset, &self.ctx) + .await?; + + #[cfg(test)] + { + // In test, tell control we are done flushing buffer. + if request.done_flush_tx.send(()).is_err() { + tracing::debug!("control dropped"); + } + } + + // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer. + if self.channel.send(slice).await.is_err() { + // Although channel is closed. Still need to finish flushing the remaining buffers. + continue; + } + } + + Ok(self.writer) + } +} + +#[cfg(test)] +pub(crate) struct FlushNotStarted { + ready_to_flush_tx: tokio::sync::oneshot::Sender<()>, + done_flush_rx: tokio::sync::oneshot::Receiver<()>, +} + +#[cfg(test)] +pub(crate) struct FlushInProgress { + done_flush_rx: tokio::sync::oneshot::Receiver<()>, +} + +#[cfg(test)] +pub(crate) struct FlushDone; + +#[cfg(test)] +impl FlushNotStarted { + /// Signals the background task the buffer is ready to flush to disk. + pub fn ready_to_flush(self) -> FlushInProgress { + self.ready_to_flush_tx + .send(()) + .map(|_| FlushInProgress { + done_flush_rx: self.done_flush_rx, + }) + .unwrap() + } +} + +#[cfg(test)] +impl FlushInProgress { + /// Waits until background flush is done. + pub async fn wait_until_flush_is_done(self) -> FlushDone { + self.done_flush_rx.await.unwrap(); + FlushDone + } +}