refactor(BufferedWriter): flush task owns the VirtualFile & abstraction for cleanup on drop (#11549)

Main change:

- `BufferedWriter` owns the `W`; no more `Arc<W>`
- We introduce auto-delete-on-drop wrappers for `VirtualFile`.
  - `TempVirtualFile` for write-only users
- `TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter` for
EphemeralFile which requires read access to the immutable prefix of the
file (see doc comments for details)
- Users of `BufferedWriter` hand it such a wrapped `VirtualFile`.
- The wrapped `VirtualFile` moves to the background flush task.
- On `BufferedWriter` shutdown, ownership moves back.
- Callers remove the wrapper (`disarm_into_inner()`) after doing final
touches, e.g., flushing index blocks and summary for delta/image layer
writers.

If the BufferedWriter isn't shut down properly via
`BufferedWriter::shutdown`, or if there is an error during final
touches, the wrapper type ensures that the file gets unlinked.

We store a GateGuard inside the wrapper to ensure that the Timeline is
still alive when unlinking on drop.

Rust doesn't have async drop yet, so, the unlinking happens using a
synchronous syscall.
NB we don't fsync the surrounding directory.
This is how it's been before this PR; I believe it is correct because
all of these files are temporary paths that get cleaned up on timeline
load.
Again, timeline load does not need to fsync because the next timeline
load will unlink again if the file reappears.

The auto-delete-on-drop can happen after a higher-level mechanism
retries.
Therefore, we switch all users to monotonically increasing, never-reused
temp file disambiguators.

The aspects pointed out in the last two paragraphs will receive further
cleanup in follow-up task
- https://github.com/neondatabase/neon/issues/11692

Drive-by changes:
- It turns out we can remove the two-pronged code in the layer file
download code.
No need to make this a separate PR because all of production already
uses `tokio-epoll-uring` with the buffered writer for many weeks.


Refs
- epic https://github.com/neondatabase/neon/issues/9868
- alternative to https://github.com/neondatabase/neon/pull/11544
This commit is contained in:
Christian Schwarz
2025-04-24 15:07:57 +02:00
committed by GitHub
parent 9d472c79ce
commit 9c6ff3aa2b
10 changed files with 348 additions and 315 deletions

View File

@@ -28,7 +28,7 @@ use tracing::warn;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ; use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor; use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile; use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer) /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping. /// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> { pub struct BlobWriter<const BUFFERED: bool> {
inner: VirtualFile, inner: TempVirtualFile,
offset: u64, offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true /// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>, buf: Vec<u8>,
@@ -228,7 +228,7 @@ pub struct BlobWriter<const BUFFERED: bool> {
impl<const BUFFERED: bool> BlobWriter<BUFFERED> { impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new( pub fn new(
inner: VirtualFile, inner: TempVirtualFile,
start_offset: u64, start_offset: u64,
_gate: &utils::sync::gate::Gate, _gate: &utils::sync::gate::Gate,
_cancel: CancellationToken, _cancel: CancellationToken,
@@ -476,30 +476,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
} }
} }
impl BlobWriter<true> { impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Access the underlying `VirtualFile`. /// Finish this blob writer and return the underlying [`TempVirtualFile`].
/// ///
/// This function flushes the internal buffer before giving access /// If there is an internal buffer (depends on `BUFFERED`), it will
/// to the underlying `VirtualFile`. /// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> { pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
self.flush_buffer(ctx).await?; if BUFFERED {
self.flush_buffer(ctx).await?;
}
Ok(self.inner) Ok(self.inner)
} }
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
}
} }
#[cfg(test)] #[cfg(test)]
@@ -512,6 +499,7 @@ pub(crate) mod tests {
use crate::context::DownloadBehavior; use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind; use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef; use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file::VirtualFile;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> { async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await round_trip_test_compressed::<BUFFERED>(blobs, false).await
@@ -530,7 +518,10 @@ pub(crate) mod tests {
// Write part (in block to drop the file) // Write part (in block to drop the file)
let mut offsets = Vec::new(); let mut offsets = Vec::new();
{ {
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; let file = TempVirtualFile::new(
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx); let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() { for blob in blobs.iter() {
let (_, res) = if compression { let (_, res) = if compression {
@@ -553,7 +544,9 @@ pub(crate) mod tests {
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?; let offs = res?;
println!("Writing final blob at offs={offs}"); println!("Writing final blob at offs={offs}");
wtr.flush_buffer(ctx).await?;
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
} }
Ok((temp_dir, pathbuf, offsets)) Ok((temp_dir, pathbuf, offsets))
} }

View File

@@ -12,6 +12,7 @@ use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info_span}; use tracing::{error, info_span};
use utils::id::TimelineId; use utils::id::TimelineId;
use utils::sync::gate::GateGuard;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf; use crate::config::PageServerConf;
@@ -21,16 +22,33 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError}; use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io}; use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub struct EphemeralFile { pub struct EphemeralFile {
_tenant_shard_id: TenantShardId, _tenant_shard_id: TenantShardId,
_timeline_id: TimelineId, _timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId, page_cache_file_id: page_cache::FileId,
bytes_written: u64, bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>, file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop) buffered_writer: BufferedWriter,
_gate_guard: utils::sync::gate::GateGuard, }
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
IoBufferMut,
TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
>;
/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
///
/// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
/// The co-ownership is between [`EphemeralFile`] and that flush task.)
///
/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
#[derive(Debug, Clone)]
struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
inner: Arc<TempVirtualFile>,
} }
const TAIL_SZ: usize = 64 * 1024; const TAIL_SZ: usize = 64 * 1024;
@@ -44,9 +62,12 @@ impl EphemeralFile {
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> { ) -> anyhow::Result<EphemeralFile> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); // TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator = let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf let filename = conf
.timeline_path(&tenant_shard_id, &timeline_id) .timeline_path(&tenant_shard_id, &timeline_id)
@@ -54,7 +75,7 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}" "ephemeral-{filename_disambiguator}"
))); )));
let file = Arc::new( let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
VirtualFile::open_with_options_v2( VirtualFile::open_with_options_v2(
&filename, &filename,
virtual_file::OpenOptions::new() virtual_file::OpenOptions::new()
@@ -64,6 +85,7 @@ impl EphemeralFile {
ctx, ctx,
) )
.await?, .await?,
gate.enter()?,
); );
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
@@ -73,7 +95,8 @@ impl EphemeralFile {
_timeline_id: timeline_id, _timeline_id: timeline_id,
page_cache_file_id, page_cache_file_id,
bytes_written: 0, bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new( file: file.clone(),
buffered_writer: BufferedWriter::new(
file, file,
|| IoBufferMut::with_capacity(TAIL_SZ), || IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?, gate.enter()?,
@@ -81,29 +104,42 @@ impl EphemeralFile {
ctx, ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
), ),
_gate_guard: gate.enter()?,
}) })
} }
} }
impl Drop for EphemeralFile { impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn drop(&mut self) { fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
// unlink the file Self {
// we are clear to do this, because we have entered a gate inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
} }
} }
} }
impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
&self,
buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<
Output = (
owned_buffers_io::io_buf_ext::FullSlice<Buf>,
std::io::Result<()>,
),
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError { pub(crate) enum EphemeralFileWriteError {
#[error("{0}")] #[error("{0}")]
@@ -262,9 +298,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end); let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 { let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
let bounds = dst.bounds(); let bounds = dst.bounds();
let slice = file let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?; .await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds) Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
@@ -456,7 +492,7 @@ mod tests {
assert_eq!(&buf, &content[range]); assert_eq!(&buf, &content[range]);
} }
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap(); let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]); assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
@@ -489,7 +525,7 @@ mod tests {
// assert the state is as this test expects it to be // assert the state is as this test expects it to be
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap(); let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]); assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.buffered_writer.as_inner().path().metadata().unwrap(); let md = file.file.path().metadata().unwrap();
assert_eq!( assert_eq!(
md.len(), md.len(),
2 * cap.into_u64(), 2 * cap.into_u64(),

View File

@@ -6,6 +6,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::future::Future; use std::future::Future;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime; use std::time::SystemTime;
use anyhow::{Context, anyhow}; use anyhow::{Context, anyhow};
@@ -15,7 +16,7 @@ use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
}; };
use tokio::fs::{self, File, OpenOptions}; use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::io::AsyncSeekExt;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::warn; use tracing::warn;
@@ -40,7 +41,10 @@ use crate::span::{
use crate::tenant::Generation; use crate::tenant::Generation;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName; use crate::tenant::storage_layer::LayerName;
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error}; use crate::virtual_file;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
/// ///
/// If 'metadata' is given, we will validate that the downloaded file's size matches that /// If 'metadata' is given, we will validate that the downloaded file's size matches that
@@ -72,21 +76,36 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation, layer_metadata.generation,
); );
// Perform a rename inspired by durable_rename from file_utils.c. let (bytes_amount, temp_file) = download_retry(
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
let bytes_amount = download_retry(
|| async { || async {
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await // TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let temp_file_path = path_with_suffix_extension(
local_path,
&format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
);
let temp_file = TempVirtualFile::new(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await
.with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
}, },
&format!("download {remote_path:?}"), &format!("download {remote_path:?}"),
cancel, cancel,
@@ -96,7 +115,8 @@ pub async fn download_layer_file<'a>(
let expected = layer_metadata.file_size; let expected = layer_metadata.file_size;
if expected != bytes_amount { if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!( return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}", "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
temp_file.path()
))); )));
} }
@@ -106,11 +126,28 @@ pub async fn download_layer_file<'a>(
))) )))
}); });
fs::rename(&temp_file_path, &local_path) // Try rename before disarming the temp file.
// That way, if rename fails for whatever reason, we clean up the temp file on the return path.
fs::rename(temp_file.path(), &local_path)
.await .await
.with_context(|| format!("rename download layer file to {local_path}")) .with_context(|| format!("rename download layer file to {local_path}"))
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
// The temp file's VirtualFile points to the temp_file_path which we moved above.
// Drop it immediately, it's invalid.
// This will get better in https://github.com/neondatabase/neon/issues/11692
let _: VirtualFile = temp_file.disarm_into_inner();
// NB: The gate guard that was stored in `temp_file` is dropped but we continue
// to operate on it and on the parent timeline directory.
// Those operations are safe to do because higher-level code is holding another gate guard:
// - attached mode: the download task spawned by struct Layer is holding the gate guard
// - secondary mode: The TenantDownloader::download holds the gate open
// The rename above is not durable yet.
// It doesn't matter for crash consistency because pageserver startup deletes temp
// files and we'll re-download on demand if necessary.
// We use fatal_err() below because the after the rename above, // We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place, // the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't. // and subsequent pageserver code could think it's durable while it really isn't.
@@ -146,147 +183,58 @@ pub async fn download_layer_file<'a>(
async fn download_object( async fn download_object(
storage: &GenericRemoteStorage, storage: &GenericRemoteStorage,
src_path: &RemotePath, src_path: &RemotePath,
dst_path: &Utf8PathBuf, destination_file: TempVirtualFile,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate, gate: &utils::sync::gate::Gate,
cancel: &CancellationToken, cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, ctx: &RequestContext,
) -> Result<u64, DownloadError> { ) -> Result<(u64, TempVirtualFile), DownloadError> {
let res = match crate::virtual_file::io_engine::get() { let mut download = storage
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"), .download(src_path, &DownloadOpts::default(), cancel)
crate::virtual_file::io_engine::IoEngine::StdFs => { .await?;
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let download = storage pausable_failpoint!("before-downloading-layer-stream-pausable");
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable"); let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
let mut buf_writer = // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file); // There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let mut reader = tokio_util::io::StreamReader::new(download.download_stream); while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
let chunk = match res {
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?; Ok(chunk) => chunk,
buf_writer.flush().await?; Err(e) => return Err(DownloadError::from(e)),
};
let mut destination_file = buf_writer.into_inner(); buffered
.write_buffered_borrowed(&chunk, ctx)
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: .await
// A file will not be closed immediately when it goes out of scope if there are any IO operations .map_err(|e| match e {
// that have not yet completed. To ensure that a file is closed immediately when it is dropped, FlushTaskError::Cancelled => DownloadError::Cancelled,
// you should call flush before dropping it. })?;
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use std::sync::Arc;
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
.await
.with_context(|| {
format!("create a destination file for layer '{dst_path}'")
})
.map_err(DownloadError::Other)?,
);
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered
.flush_and_into_inner(ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
} }
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
} }
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok((bytes_amount, destination_file))
} }
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";

View File

@@ -646,7 +646,7 @@ enum UpdateError {
NoData, NoData,
#[error("Insufficient local storage space")] #[error("Insufficient local storage space")]
NoSpace, NoSpace,
#[error("Failed to download")] #[error("Failed to download: {0}")]
DownloadError(DownloadError), DownloadError(DownloadError),
#[error(transparent)] #[error(transparent)]
Deserialize(#[from] serde_json::Error), Deserialize(#[from] serde_json::Error),

View File

@@ -34,6 +34,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure}; use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
@@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value; use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf; use tokio_epoll_uring::IoBuf;
@@ -74,6 +73,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner, VectoredReadPlanner,
}; };
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -288,19 +288,20 @@ impl DeltaLayer {
key_start: Key, key_start: Key,
lsn_range: &Range<Lsn>, lsn_range: &Range<Lsn>,
) -> Utf8PathBuf { ) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng() // TempVirtualFile requires us to never reuse a filename while an old
.sample_iter(&Alphanumeric) // instance of TempVirtualFile created with that filename is not done dropping yet.
.take(8) // So, we use a monotonic counter to disambiguate the filenames.
.map(char::from) static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
.collect(); let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id) conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!( .join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}", "{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start, key_start,
u64::from(lsn_range.start), u64::from(lsn_range.start),
u64::from(lsn_range.end), u64::from(lsn_range.end),
rand_string, filename_disambiguator,
TEMP_FILE_SUFFIX, TEMP_FILE_SUFFIX,
)) ))
} }
@@ -421,7 +422,7 @@ impl DeltaLayerWriterInner {
let path = let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path, ctx).await?; let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
// make room for the header block // make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -515,22 +516,6 @@ impl DeltaLayerWriterInner {
self, self,
key_end: Key, key_end: Key,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
async fn finish0(
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -598,6 +583,10 @@ impl DeltaLayerWriterInner {
trace!("created delta layer {}", self.path); trace!("created delta layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path)) Ok((desc, self.path))
} }
} }
@@ -726,17 +715,6 @@ impl DeltaLayerWriter {
} }
} }
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
}
}
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError { pub enum RewriteSummaryError {
#[error("magic mismatch")] #[error("magic mismatch")]
@@ -1609,8 +1587,8 @@ pub(crate) mod test {
use bytes::Bytes; use bytes::Bytes;
use itertools::MinMaxResult; use itertools::MinMaxResult;
use pageserver_api::value::Value; use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*; use super::*;
use crate::DEFAULT_PG_VERSION; use crate::DEFAULT_PG_VERSION;

View File

@@ -32,6 +32,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt; use std::os::unix::prelude::FileExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure}; use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes; use bytes::Bytes;
@@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace; use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value; use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
@@ -72,6 +71,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner, VectoredReadPlanner,
}; };
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -252,14 +252,18 @@ impl ImageLayer {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
fname: &ImageLayerName, fname: &ImageLayerName,
) -> Utf8PathBuf { ) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng() // TempVirtualFile requires us to never reuse a filename while an old
.sample_iter(&Alphanumeric) // instance of TempVirtualFile created with that filename is not done dropping yet.
.take(8) // So, we use a monotonic counter to disambiguate the filenames.
.map(char::from) static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
.collect(); let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id) conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}")) .join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
} }
/// ///
@@ -773,7 +777,7 @@ impl ImageLayerWriterInner {
}, },
); );
trace!("creating image layer {}", path); trace!("creating image layer {}", path);
let mut file = { let mut file = TempVirtualFile::new(
VirtualFile::open_with_options( VirtualFile::open_with_options(
&path, &path,
virtual_file::OpenOptions::new() virtual_file::OpenOptions::new()
@@ -781,8 +785,9 @@ impl ImageLayerWriterInner {
.create_new(true), .create_new(true),
ctx, ctx,
) )
.await? .await?,
}; gate.enter()?,
);
// make room for the header block // make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -896,25 +901,6 @@ impl ImageLayerWriterInner {
self, self,
ctx: &RequestContext, ctx: &RequestContext,
end_key: Option<Key>, end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -932,7 +918,7 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
}; };
let mut file = self.blob_writer.into_inner(); let mut file = self.blob_writer.into_inner(ctx).await?;
// Write out the index // Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
@@ -1000,6 +986,10 @@ impl ImageLayerWriterInner {
trace!("created image layer {}", self.path); trace!("created image layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path)) Ok((desc, self.path))
} }
} }
@@ -1125,14 +1115,6 @@ impl ImageLayerWriter {
} }
} }
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
}
}
pub struct ImageLayerIterator<'a> { pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner, image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext, ctx: &'a RequestContext,

View File

@@ -25,29 +25,31 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut}; use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice; use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pub use pageserver_api::models::virtual_file as api;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant; use tokio::time::Instant;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use self::owned_buffers_io::write::OwnedAsyncWriter;
use crate::assert_u64_eq_usize::UsizeIsU64; use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation}; use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
use crate::page_cache::{PAGE_SZ, PageWriteGuard}; use crate::page_cache::{PAGE_SZ, PageWriteGuard};
pub(crate) mod io_engine;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub use io_engine::{ pub use io_engine::{
FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test, FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
io_engine_for_bench, io_engine_for_bench,
}; };
mod metadata;
mod open_options;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata; pub(crate) use metadata::Metadata;
pub(crate) use open_options::*; pub(crate) use open_options::*;
pub use pageserver_api::models::virtual_file as api;
pub use temporary::TempVirtualFile;
use self::owned_buffers_io::write::OwnedAsyncWriter; pub(crate) mod io_engine;
mod metadata;
mod open_options;
mod temporary;
pub(crate) mod owned_buffers_io { pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers. //! Abstractions for IO with owned buffers.
//! //!

View File

@@ -1,5 +1,4 @@
mod flush; mod flush;
use std::sync::Arc;
pub(crate) use flush::FlushControl; pub(crate) use flush::FlushControl;
use flush::FlushHandle; use flush::FlushHandle;
@@ -41,7 +40,6 @@ pub trait OwnedAsyncWriter {
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput, // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer. // since we would avoid copying majority of the data into the internal buffer.
pub struct BufferedWriter<B: Buffer, W> { pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// Clone of the buffer that was last submitted to the flush loop. /// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after. /// `None` if no flush request has been submitted, Some forever after.
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>, pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
@@ -72,7 +70,7 @@ where
/// ///
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer. /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new( pub fn new(
writer: Arc<W>, writer: W,
buf_new: impl Fn() -> B, buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard, gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken, cancel: CancellationToken,
@@ -80,7 +78,6 @@ where
flush_task_span: tracing::Span, flush_task_span: tracing::Span,
) -> Self { ) -> Self {
Self { Self {
writer: writer.clone(),
mutable: Some(buf_new()), mutable: Some(buf_new()),
maybe_flushed: None, maybe_flushed: None,
flush_handle: FlushHandle::spawn_new( flush_handle: FlushHandle::spawn_new(
@@ -95,10 +92,6 @@ where
} }
} }
pub fn as_inner(&self) -> &W {
&self.writer
}
/// Returns the number of bytes submitted to the background flush task. /// Returns the number of bytes submitted to the background flush task.
pub fn bytes_submitted(&self) -> u64 { pub fn bytes_submitted(&self) -> u64 {
self.bytes_submitted self.bytes_submitted
@@ -116,20 +109,16 @@ where
} }
#[cfg_attr(target_os = "macos", allow(dead_code))] #[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner( pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
self.flush(ctx).await?; self.flush(ctx).await?;
let Self { let Self {
mutable: buf, mutable: buf,
maybe_flushed: _, maybe_flushed: _,
writer,
mut flush_handle, mut flush_handle,
bytes_submitted: bytes_amount, bytes_submitted: bytes_amount,
} = self; } = self;
flush_handle.shutdown().await?; let writer = flush_handle.shutdown().await?;
assert!(buf.is_some()); assert!(buf.is_some());
Ok((bytes_amount, writer)) Ok((bytes_amount, writer))
} }
@@ -329,7 +318,7 @@ mod tests {
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> { async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
let ctx = test_ctx(); let ctx = test_ctx();
let ctx = &ctx; let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default()); let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default(); let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new( let mut writer = BufferedWriter::<_, RecorderWriter>::new(
@@ -350,7 +339,7 @@ mod tests {
writer.write_buffered_borrowed(b"j", ctx).await?; writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?; writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.flush_and_into_inner(ctx).await?; let (_, recorder) = writer.shutdown(ctx).await?;
assert_eq!( assert_eq!(
recorder.get_writes(), recorder.get_writes(),
{ {

View File

@@ -1,5 +1,4 @@
use std::ops::ControlFlow; use std::ops::ControlFlow;
use std::sync::Arc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn}; use tracing::{Instrument, info, info_span, warn};
@@ -21,7 +20,7 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer. /// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>, channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task. /// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>, join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
} }
struct FlushRequest<Buf> { struct FlushRequest<Buf> {
@@ -120,7 +119,7 @@ where
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth. /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer. /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
pub fn spawn_new<B>( pub fn spawn_new<B>(
file: Arc<W>, file: W,
buf: B, buf: B,
gate_guard: utils::sync::gate::GateGuard, gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken, cancel: CancellationToken,
@@ -183,7 +182,7 @@ where
} }
/// Cleans up the channel, join the flush task. /// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> { pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
let handle = self let handle = self
.inner .inner
.take() .take()
@@ -207,7 +206,7 @@ pub struct FlushBackgroundTask<Buf, W> {
/// and send back recycled buffer. /// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>, channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk. /// A writter for persisting data to disk.
writer: Arc<W>, writer: W,
ctx: RequestContext, ctx: RequestContext,
cancel: CancellationToken, cancel: CancellationToken,
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk. /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
@@ -228,7 +227,7 @@ where
/// Creates a new background flush task. /// Creates a new background flush task.
fn new( fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>, channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: Arc<W>, file: W,
gate_guard: utils::sync::gate::GateGuard, gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken, cancel: CancellationToken,
ctx: RequestContext, ctx: RequestContext,
@@ -243,7 +242,7 @@ where
} }
/// Runs the background flush task. /// Runs the background flush task.
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> { async fn run(mut self) -> Result<W, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed // Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await { while let Some(request) = self.channel.recv().await {
#[cfg(test)] #[cfg(test)]

View File

@@ -0,0 +1,106 @@
use tracing::error;
use utils::sync::gate::GateGuard;
use crate::context::RequestContext;
use super::{
MaybeFatalIo, VirtualFile,
owned_buffers_io::{
io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter,
},
};
/// A wrapper around [`super::VirtualFile`] that deletes the file on drop.
/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`].
#[derive(Debug)]
pub struct TempVirtualFile {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
file: VirtualFile,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: GateGuard,
}
impl OwnedAsyncWriter for TempVirtualFile {
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
}
impl Drop for TempVirtualFile {
fn drop(&mut self) {
let Some(Inner { file, _gate_guard }) = self.inner.take() else {
return;
};
let path = file.path();
if let Err(e) =
std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%path, "failed to remove");
}
drop(_gate_guard);
}
}
impl std::ops::Deref for TempVirtualFile {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("only None after into_inner or drop")
.file
}
}
impl std::ops::DerefMut for TempVirtualFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.inner
.as_mut()
.expect("only None after into_inner or drop")
.file
}
}
impl TempVirtualFile {
/// The caller is responsible for ensuring that the path of `virtual_file` is not reused
/// until after this TempVirtualFile's `Drop` impl has completed.
/// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl.
/// The best way to do so is by using a monotonic counter as a disambiguator.
/// TODO: centralize this disambiguator pattern inside this struct.
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Some(Inner {
file: virtual_file,
_gate_guard: gate_guard,
}),
}
}
/// Dismantle this wrapper and return the underlying [`VirtualFile`].
/// This disables auto-unlinking functionality that is the essence of this wrapper.
///
/// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem
/// operations after calls to this functions are still gated by some other gate guard.
///
/// TODO:
/// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent))
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn disarm_into_inner(mut self) -> VirtualFile {
self.inner
.take()
.expect("only None after into_inner or drop, and we are into_inner, and we consume")
.file
}
}