diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 961407509e..5f7e86f364 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -138,7 +138,7 @@ impl EphemeralFile { Ok(pos) } - pub(crate) async fn write_raw_controlled( + async fn write_raw_controlled( &mut self, srcbuf: &[u8], ctx: &RequestContext, @@ -552,8 +552,9 @@ mod tests { // completely within the file range assert!(align < cap, "test assumption"); assert!(cap % align == 0); - let not_started = control.unwrap().into_not_started(); + // test reads at different flush stages. + let not_started = control.unwrap().into_not_started(); test_read_all_offset_combinations().await; let in_progress = not_started.ready_to_flush(); test_read_all_offset_combinations().await; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 99e4dc4c76..84bf32b684 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -201,7 +201,7 @@ where } /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior. - pub async fn write_buffered_borrowed_controlled( + pub(crate) async fn write_buffered_borrowed_controlled( &mut self, mut chunk: &[u8], ctx: &RequestContext, diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 84e569baf7..585c70218b 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -35,6 +35,7 @@ struct FlushRequest { done_flush_tx: tokio::sync::oneshot::Sender<()>, } +/// Constructs a request and a control object for a new flush operation. #[cfg(not(test))] fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, FlushControl) { let request = FlushRequest { slice, offset }; @@ -43,6 +44,7 @@ fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, (request, control) } +/// Constructs a request and a control object for a new flush operation. #[cfg(test)] fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, FlushControl) { let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel(); @@ -59,12 +61,15 @@ fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, } pub enum FlushStartState { + /// The submitted buffer's flush state is not tracked. #[cfg(not(test))] Untracked, + /// The submitted buffer has not been flushed to disk. #[cfg(test)] NotStarted(FlushNotStarted), } +/// A control object that manipulates buffer's flush behehavior. pub(crate) struct FlushControl { state: FlushStartState, } @@ -82,6 +87,7 @@ impl FlushControl { }), } } + #[cfg(not(test))] fn untracked() -> Self { FlushControl { @@ -97,6 +103,9 @@ impl FlushControl { } } + /// Release control to the submitted buffer. + /// + /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution. pub async fn release(self) { match self.state { #[cfg(not(test))] @@ -287,6 +296,7 @@ pub(crate) struct FlushDone; #[cfg(test)] impl FlushNotStarted { + /// Signals the background task the buffer is ready to flush to disk. pub fn ready_to_flush(self) -> FlushInProgress { self.ready_to_flush_tx .send(()) @@ -299,6 +309,7 @@ impl FlushNotStarted { #[cfg(test)] impl FlushInProgress { + /// Waits until background flush is done. pub async fn wait_until_flush_is_done(self) -> FlushDone { self.done_flush_rx.await.unwrap(); FlushDone