diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 6e36b97560..7f4bf6b9aa 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -384,7 +384,7 @@ mod tests { let cap = file.buffered_writer.inspect_mutable().capacity(); - let write_nbytes = cap + cap / 2; + let write_nbytes = cap * 2 + cap / 2; let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -411,10 +411,13 @@ mod tests { } let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap(); - assert_eq!(file_contents, &content[0..cap]); + assert!(file_contents == &content[0..cap] || file_contents == &content[0..cap * 2]); - let buffer_contents = file.buffered_writer.inspect_mutable(); - assert_eq!(buffer_contents, &content[cap..write_nbytes]); + let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); + assert_eq!(maybe_flushed_buffer_contents, &content[cap..cap * 2]); + + let mutable_buffer_contents = file.buffered_writer.inspect_mutable(); + assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]); } #[tokio::test] @@ -428,11 +431,12 @@ mod tests { .await .unwrap(); + // mutable buffer and maybe_flushed buffer each has cap. let cap = file.buffered_writer.inspect_mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) - .take(cap + cap / 2) + .take(cap * 2 + cap / 2) .collect(); file.write_raw(&content, &ctx).await.unwrap(); @@ -440,17 +444,20 @@ mod tests { // assert the state is as this test expects it to be assert_eq!( &file.load_to_io_buf(&ctx).await.unwrap(), - &content[0..cap + cap / 2] + &content[0..cap * 2 + cap / 2] ); let md = file.buffered_writer.as_inner().path().metadata().unwrap(); + assert!( + md.len() == cap.into_u64() || md.len() == 2 * cap.into_u64(), + "buffered writer requires one write to be flushed if we write 2.5x buffer capacity" + ); assert_eq!( - md.len(), - cap.into_u64(), - "buffered writer does one write if we write 1.5x buffer capacity" + &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap], + &content[cap..cap * 2] ); assert_eq!( &file.buffered_writer.inspect_mutable()[0..cap / 2], - &content[cap..cap + cap / 2] + &content[cap * 2..cap * 2 + cap / 2] ); } @@ -475,7 +482,7 @@ mod tests { let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) - .take(cap + cap / 2) + .take(cap * 2 + cap / 2) .collect(); file.write_raw(&content, &ctx).await.unwrap(); @@ -505,9 +512,17 @@ mod tests { test_read(cap - 10, 10).await; // read across file and buffer test_read(cap - 10, 20).await; - // stay from start of buffer + // stay from start of maybe flushed buffer test_read(cap, 10).await; - // completely within buffer + // completely within maybe flushed buffer test_read(cap + 10, 10).await; + // border onto edge of maybe flushed buffer. + test_read(cap * 2 - 10, 10).await; + // read across maybe flushed and mutable buffer + test_read(cap * 2 - 10, 20).await; + // read across three segments + test_read(cap - 10, cap + 20).await; + // completely within mutable buffer + test_read(cap * 2 + 10, 10).await; } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index cf0157bd75..f0399681f1 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -121,7 +121,7 @@ where let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk if chunk.len() >= self.mutable().cap() { - // TODO(yuchen): do we still want to keep this? + // TODO(yuchen): do we still want to keep the bypass path? self.flush(ctx).await?; // do a big write, bypassing `buf` assert_eq!( diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 9b5f36a5d3..a9c1404712 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -42,6 +42,7 @@ impl Duplex { } } +// TODO(yuchen): special actions in drop to clean up the join handle? pub struct FlushHandleInner { /// A bi-directional channel that sends (buffer, offset) for writes, /// and receives recyled buffer.