mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
@@ -384,7 +384,7 @@ mod tests {
|
||||
|
||||
let cap = file.buffered_writer.inspect_mutable().capacity();
|
||||
|
||||
let write_nbytes = cap + cap / 2;
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -411,10 +411,13 @@ mod tests {
|
||||
}
|
||||
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
assert_eq!(file_contents, &content[0..cap]);
|
||||
assert!(file_contents == &content[0..cap] || file_contents == &content[0..cap * 2]);
|
||||
|
||||
let buffer_contents = file.buffered_writer.inspect_mutable();
|
||||
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(maybe_flushed_buffer_contents, &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -428,11 +431,12 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has cap.
|
||||
let cap = file.buffered_writer.inspect_mutable().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap + cap / 2)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
@@ -440,17 +444,20 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
assert_eq!(
|
||||
&file.load_to_io_buf(&ctx).await.unwrap(),
|
||||
&content[0..cap + cap / 2]
|
||||
&content[0..cap * 2 + cap / 2]
|
||||
);
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
assert!(
|
||||
md.len() == cap.into_u64() || md.len() == 2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
);
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
cap.into_u64(),
|
||||
"buffered writer does one write if we write 1.5x buffer capacity"
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_mutable()[0..cap / 2],
|
||||
&content[cap..cap + cap / 2]
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -475,7 +482,7 @@ mod tests {
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap + cap / 2)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
@@ -505,9 +512,17 @@ mod tests {
|
||||
test_read(cap - 10, 10).await;
|
||||
// read across file and buffer
|
||||
test_read(cap - 10, 20).await;
|
||||
// stay from start of buffer
|
||||
// stay from start of maybe flushed buffer
|
||||
test_read(cap, 10).await;
|
||||
// completely within buffer
|
||||
// completely within maybe flushed buffer
|
||||
test_read(cap + 10, 10).await;
|
||||
// border onto edge of maybe flushed buffer.
|
||||
test_read(cap * 2 - 10, 10).await;
|
||||
// read across maybe flushed and mutable buffer
|
||||
test_read(cap * 2 - 10, 20).await;
|
||||
// read across three segments
|
||||
test_read(cap - 10, cap + 20).await;
|
||||
// completely within mutable buffer
|
||||
test_read(cap * 2 + 10, 10).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ where
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= self.mutable().cap() {
|
||||
// TODO(yuchen): do we still want to keep this?
|
||||
// TODO(yuchen): do we still want to keep the bypass path?
|
||||
self.flush(ctx).await?;
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
|
||||
@@ -42,6 +42,7 @@ impl<S: Send, R: Send> Duplex<S, R> {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yuchen): special actions in drop to clean up the join handle?
|
||||
pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
|
||||
Reference in New Issue
Block a user