mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
use Arc around W: OwnedAsyncWriter
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -19,6 +19,7 @@ use tracing::error;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
@@ -51,15 +52,17 @@ impl EphemeralFile {
|
||||
"ephemeral-{filename_disambiguator}"
|
||||
)));
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open_with_options(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
@@ -206,10 +207,14 @@ async fn download_object<'a>(
|
||||
use crate::virtual_file::owned_buffers_io;
|
||||
use bytes::BytesMut;
|
||||
async {
|
||||
let destination_file = VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
let destination_file = Arc::new(
|
||||
VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("create a destination file for layer '{dst_path}'")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
|
||||
@@ -32,7 +34,7 @@ pub trait OwnedAsyncWriter {
|
||||
/// In such cases, a different implementation that always buffers in memory
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
writer: Arc<W>,
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
@@ -48,7 +50,7 @@ where
|
||||
Buf: IoBuf + Send,
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
pub fn new(writer: W, buf: B) -> Self {
|
||||
pub fn new(writer: Arc<W>, buf: B) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
buf: Some(buf),
|
||||
@@ -70,7 +72,10 @@ where
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> {
|
||||
pub async fn flush_and_into_inner(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(u64, Arc<W>)> {
|
||||
self.flush(ctx).await?;
|
||||
|
||||
let Self {
|
||||
@@ -290,7 +295,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
@@ -307,7 +312,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
@@ -323,8 +328,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let mut writer =
|
||||
BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
@@ -341,8 +347,9 @@ mod tests {
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let mut writer =
|
||||
BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2));
|
||||
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"d", ctx).await?;
|
||||
|
||||
Reference in New Issue
Block a user