mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
@@ -138,7 +138,7 @@ impl EphemeralFile {
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
pub(crate) async fn write_raw_controlled(
|
||||
async fn write_raw_controlled(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
@@ -552,8 +552,9 @@ mod tests {
|
||||
// completely within the file range
|
||||
assert!(align < cap, "test assumption");
|
||||
assert!(cap % align == 0);
|
||||
let not_started = control.unwrap().into_not_started();
|
||||
|
||||
// test reads at different flush stages.
|
||||
let not_started = control.unwrap().into_not_started();
|
||||
test_read_all_offset_combinations().await;
|
||||
let in_progress = not_started.ready_to_flush();
|
||||
test_read_all_offset_combinations().await;
|
||||
|
||||
@@ -201,7 +201,7 @@ where
|
||||
}
|
||||
|
||||
/// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
|
||||
pub async fn write_buffered_borrowed_controlled(
|
||||
pub(crate) async fn write_buffered_borrowed_controlled(
|
||||
&mut self,
|
||||
mut chunk: &[u8],
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -35,6 +35,7 @@ struct FlushRequest<Buf> {
|
||||
done_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(not(test))]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let request = FlushRequest { slice, offset };
|
||||
@@ -43,6 +44,7 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
|
||||
(request, control)
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(test)]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
|
||||
@@ -59,12 +61,15 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
|
||||
}
|
||||
|
||||
pub enum FlushStartState {
|
||||
/// The submitted buffer's flush state is not tracked.
|
||||
#[cfg(not(test))]
|
||||
Untracked,
|
||||
/// The submitted buffer has not been flushed to disk.
|
||||
#[cfg(test)]
|
||||
NotStarted(FlushNotStarted),
|
||||
}
|
||||
|
||||
/// A control object that manipulates buffer's flush behehavior.
|
||||
pub(crate) struct FlushControl {
|
||||
state: FlushStartState,
|
||||
}
|
||||
@@ -82,6 +87,7 @@ impl FlushControl {
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
fn untracked() -> Self {
|
||||
FlushControl {
|
||||
@@ -97,6 +103,9 @@ impl FlushControl {
|
||||
}
|
||||
}
|
||||
|
||||
/// Release control to the submitted buffer.
|
||||
///
|
||||
/// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
|
||||
pub async fn release(self) {
|
||||
match self.state {
|
||||
#[cfg(not(test))]
|
||||
@@ -287,6 +296,7 @@ pub(crate) struct FlushDone;
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushNotStarted {
|
||||
/// Signals the background task the buffer is ready to flush to disk.
|
||||
pub fn ready_to_flush(self) -> FlushInProgress {
|
||||
self.ready_to_flush_tx
|
||||
.send(())
|
||||
@@ -299,6 +309,7 @@ impl FlushNotStarted {
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushInProgress {
|
||||
/// Waits until background flush is done.
|
||||
pub async fn wait_until_flush_is_done(self) -> FlushDone {
|
||||
self.done_flush_rx.await.unwrap();
|
||||
FlushDone
|
||||
|
||||
Reference in New Issue
Block a user