diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 3483a9f31e..a147fe4468 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -28,7 +28,7 @@ use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; #[derive(Copy, Clone, Debug)] @@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// discarded. You need to call [`flush_buffer`](Self::flush_buffer) /// manually before dropping. pub struct BlobWriter { - inner: VirtualFile, + inner: TempVirtualFile, offset: u64, /// A buffer to save on write calls, only used if BUFFERED=true buf: Vec, @@ -228,7 +228,7 @@ pub struct BlobWriter { impl BlobWriter { pub fn new( - inner: VirtualFile, + inner: TempVirtualFile, start_offset: u64, _gate: &utils::sync::gate::Gate, _cancel: CancellationToken, @@ -476,30 +476,17 @@ impl BlobWriter { } } -impl BlobWriter { - /// Access the underlying `VirtualFile`. +impl BlobWriter { + /// Finish this blob writer and return the underlying [`TempVirtualFile`]. /// - /// This function flushes the internal buffer before giving access - /// to the underlying `VirtualFile`. - pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { - self.flush_buffer(ctx).await?; + /// If there is an internal buffer (depends on `BUFFERED`), it will + /// be flushed before this method returns. + pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { + if BUFFERED { + self.flush_buffer(ctx).await?; + } Ok(self.inner) } - - /// Access the underlying `VirtualFile`. - /// - /// Unlike [`into_inner`](Self::into_inner), this doesn't flush - /// the internal buffer before giving access. - pub fn into_inner_no_flush(self) -> VirtualFile { - self.inner - } -} - -impl BlobWriter { - /// Access the underlying `VirtualFile`. - pub fn into_inner(self) -> VirtualFile { - self.inner - } } #[cfg(test)] @@ -512,6 +499,7 @@ pub(crate) mod tests { use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; use crate::tenant::block_io::BlockReaderRef; + use crate::virtual_file::VirtualFile; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { round_trip_test_compressed::(blobs, false).await @@ -530,7 +518,10 @@ pub(crate) mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; + let file = TempVirtualFile::new( + VirtualFile::create(pathbuf.as_path(), ctx).await?, + gate.enter().unwrap(), + ); let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); for blob in blobs.iter() { let (_, res) = if compression { @@ -553,7 +544,9 @@ pub(crate) mod tests { let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; let offs = res?; println!("Writing final blob at offs={offs}"); - wtr.flush_buffer(ctx).await?; + + let file = wtr.into_inner(ctx).await?; + file.disarm_into_inner(); } Ok((temp_dir, pathbuf, offsets)) } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 19215bb918..be28ac725b 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -12,6 +12,7 @@ use tokio_epoll_uring::{BoundedBuf, Slice}; use tokio_util::sync::CancellationToken; use tracing::{error, info_span}; use utils::id::TimelineId; +use utils::sync::gate::GateGuard; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; @@ -21,16 +22,33 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError}; -use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io}; +use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io}; + +use self::owned_buffers_io::write::OwnedAsyncWriter; pub struct EphemeralFile { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, page_cache_file_id: page_cache::FileId, bytes_written: u64, - buffered_writer: owned_buffers_io::write::BufferedWriter, - /// Gate guard is held on as long as we need to do operations in the path (delete on drop) - _gate_guard: utils::sync::gate::GateGuard, + file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter, + buffered_writer: BufferedWriter, +} + +type BufferedWriter = owned_buffers_io::write::BufferedWriter< + IoBufferMut, + TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter, +>; + +/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`]. +/// +/// (Actually [`BufferedWriter`] internally is just a client to a background flush task. +/// The co-ownership is between [`EphemeralFile`] and that flush task.) +/// +/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`]. +#[derive(Debug, Clone)] +struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter { + inner: Arc, } const TAIL_SZ: usize = 64 * 1024; @@ -44,9 +62,12 @@ impl EphemeralFile { cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { - static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); + // TempVirtualFile requires us to never reuse a filename while an old + // instance of TempVirtualFile created with that filename is not done dropping yet. + // So, we use a monotonic counter to disambiguate the filenames. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); let filename_disambiguator = - NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let filename = conf .timeline_path(&tenant_shard_id, &timeline_id) @@ -54,7 +75,7 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = Arc::new( + let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new( VirtualFile::open_with_options_v2( &filename, virtual_file::OpenOptions::new() @@ -64,6 +85,7 @@ impl EphemeralFile { ctx, ) .await?, + gate.enter()?, ); let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore @@ -73,7 +95,8 @@ impl EphemeralFile { _timeline_id: timeline_id, page_cache_file_id, bytes_written: 0, - buffered_writer: owned_buffers_io::write::BufferedWriter::new( + file: file.clone(), + buffered_writer: BufferedWriter::new( file, || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, @@ -81,29 +104,42 @@ impl EphemeralFile { ctx, info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), - _gate_guard: gate.enter()?, }) } } -impl Drop for EphemeralFile { - fn drop(&mut self) { - // unlink the file - // we are clear to do this, because we have entered a gate - let path = self.buffered_writer.as_inner().path(); - let res = std::fs::remove_file(path); - if let Err(e) = res { - if e.kind() != std::io::ErrorKind::NotFound { - // just never log the not found errors, we cannot do anything for them; on detach - // the tenant directory is already gone. - // - // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 - error!("could not remove ephemeral file '{path}': {e}"); - } +impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter { + fn new(file: VirtualFile, gate_guard: GateGuard) -> Self { + Self { + inner: Arc::new(TempVirtualFile::new(file, gate_guard)), } } } +impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter { + fn write_all_at( + &self, + buf: owned_buffers_io::io_buf_ext::FullSlice, + offset: u64, + ctx: &RequestContext, + ) -> impl std::future::Future< + Output = ( + owned_buffers_io::io_buf_ext::FullSlice, + std::io::Result<()>, + ), + > + Send { + self.inner.write_all_at(buf, offset, ctx) + } +} + +impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter { + type Target = VirtualFile; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum EphemeralFileWriteError { #[error("{0}")] @@ -262,9 +298,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral let mutable_range = Range(std::cmp::max(start, submitted_offset), end); let dst = if written_range.len() > 0 { - let file: &VirtualFile = self.buffered_writer.as_inner(); let bounds = dst.bounds(); - let slice = file + let slice = self + .file .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) .await?; Slice::from_buf_bounds(Slice::into_inner(slice), bounds) @@ -456,7 +492,7 @@ mod tests { assert_eq!(&buf, &content[range]); } - let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap(); + let file_contents = std::fs::read(file.file.path()).unwrap(); assert!(file_contents == content[0..cap * 2]); let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); @@ -489,7 +525,7 @@ mod tests { // assert the state is as this test expects it to be let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap(); assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]); - let md = file.buffered_writer.as_inner().path().metadata().unwrap(); + let md = file.file.path().metadata().unwrap(); assert_eq!( md.len(), 2 * cap.into_u64(), diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 70f77ef9e8..19cf70a055 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -6,6 +6,7 @@ use std::collections::HashSet; use std::future::Future; use std::str::FromStr; +use std::sync::atomic::AtomicU64; use std::time::SystemTime; use anyhow::{Context, anyhow}; @@ -15,7 +16,7 @@ use remote_storage::{ DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, }; use tokio::fs::{self, File, OpenOptions}; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::io::AsyncSeekExt; use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -40,7 +41,10 @@ use crate::span::{ use crate::tenant::Generation; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; -use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error}; +use crate::virtual_file; +use crate::virtual_file::owned_buffers_io::write::FlushTaskError; +use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{TempVirtualFile, owned_buffers_io}; /// /// If 'metadata' is given, we will validate that the downloaded file's size matches that @@ -72,21 +76,36 @@ pub async fn download_layer_file<'a>( layer_metadata.generation, ); - // Perform a rename inspired by durable_rename from file_utils.c. - // The sequence: - // write(tmp) - // fsync(tmp) - // rename(tmp, new) - // fsync(new) - // fsync(parent) - // For more context about durable_rename check this email from postgres mailing list: - // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com - // If pageserver crashes the temp file will be deleted on startup and re-downloaded. - let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION); - - let bytes_amount = download_retry( + let (bytes_amount, temp_file) = download_retry( || async { - download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await + // TempVirtualFile requires us to never reuse a filename while an old + // instance of TempVirtualFile created with that filename is not done dropping yet. + // So, we use a monotonic counter to disambiguate the filenames. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let temp_file_path = path_with_suffix_extension( + local_path, + &format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"), + ); + + let temp_file = TempVirtualFile::new( + // Not _v2 yet which is sensitive to virtual_file_io_mode. + // That'll happen in PR https://github.com/neondatabase/neon/pull/11558 + VirtualFile::open_with_options( + &temp_file_path, + virtual_file::OpenOptions::new() + .create_new(true) + .write(true), + ctx, + ) + .await + .with_context(|| format!("create a temp file for layer download: {temp_file_path}")) + .map_err(DownloadError::Other)?, + gate.enter().map_err(|_| DownloadError::Cancelled)?, + ); + download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await }, &format!("download {remote_path:?}"), cancel, @@ -96,7 +115,8 @@ pub async fn download_layer_file<'a>( let expected = layer_metadata.file_size; if expected != bytes_amount { return Err(DownloadError::Other(anyhow!( - "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}", + "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}", + temp_file.path() ))); } @@ -106,11 +126,28 @@ pub async fn download_layer_file<'a>( ))) }); - fs::rename(&temp_file_path, &local_path) + // Try rename before disarming the temp file. + // That way, if rename fails for whatever reason, we clean up the temp file on the return path. + + fs::rename(temp_file.path(), &local_path) .await .with_context(|| format!("rename download layer file to {local_path}")) .map_err(DownloadError::Other)?; + // The temp file's VirtualFile points to the temp_file_path which we moved above. + // Drop it immediately, it's invalid. + // This will get better in https://github.com/neondatabase/neon/issues/11692 + let _: VirtualFile = temp_file.disarm_into_inner(); + // NB: The gate guard that was stored in `temp_file` is dropped but we continue + // to operate on it and on the parent timeline directory. + // Those operations are safe to do because higher-level code is holding another gate guard: + // - attached mode: the download task spawned by struct Layer is holding the gate guard + // - secondary mode: The TenantDownloader::download holds the gate open + + // The rename above is not durable yet. + // It doesn't matter for crash consistency because pageserver startup deletes temp + // files and we'll re-download on demand if necessary. + // We use fatal_err() below because the after the rename above, // the in-memory state of the filesystem already has the layer file in its final place, // and subsequent pageserver code could think it's durable while it really isn't. @@ -146,147 +183,58 @@ pub async fn download_layer_file<'a>( async fn download_object( storage: &GenericRemoteStorage, src_path: &RemotePath, - dst_path: &Utf8PathBuf, - #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate, + destination_file: TempVirtualFile, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, - #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, -) -> Result { - let res = match crate::virtual_file::io_engine::get() { - crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"), - crate::virtual_file::io_engine::IoEngine::StdFs => { - async { - let destination_file = tokio::fs::File::create(dst_path) - .await - .with_context(|| format!("create a destination file for layer '{dst_path}'")) - .map_err(DownloadError::Other)?; + ctx: &RequestContext, +) -> Result<(u64, TempVirtualFile), DownloadError> { + let mut download = storage + .download(src_path, &DownloadOpts::default(), cancel) + .await?; - let download = storage - .download(src_path, &DownloadOpts::default(), cancel) - .await?; + pausable_failpoint!("before-downloading-layer-stream-pausable"); - pausable_failpoint!("before-downloading-layer-stream-pausable"); + let dst_path = destination_file.path().to_owned(); + let mut buffered = owned_buffers_io::write::BufferedWriter::::new( + destination_file, + || IoBufferMut::with_capacity(super::BUFFER_SIZE), + gate.enter().map_err(|_| DownloadError::Cancelled)?, + cancel.child_token(), + ctx, + tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path), + ); - let mut buf_writer = - tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file); - - let mut reader = tokio_util::io::StreamReader::new(download.download_stream); - - let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?; - buf_writer.flush().await?; - - let mut destination_file = buf_writer.into_inner(); - - // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: - // A file will not be closed immediately when it goes out of scope if there are any IO operations - // that have not yet completed. To ensure that a file is closed immediately when it is dropped, - // you should call flush before dropping it. - // - // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because - // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations. - // But for additional safety lets check/wait for any pending operations. - destination_file - .flush() - .await - .maybe_fatal_err("download_object sync_all") - .with_context(|| format!("flush source file at {dst_path}")) - .map_err(DownloadError::Other)?; - - // not using sync_data because it can lose file size update - destination_file - .sync_all() - .await - .maybe_fatal_err("download_object sync_all") - .with_context(|| format!("failed to fsync source file at {dst_path}")) - .map_err(DownloadError::Other)?; - - Ok(bytes_amount) - } - .await - } - #[cfg(target_os = "linux")] - crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { - use crate::virtual_file::owned_buffers_io::write::FlushTaskError; - use std::sync::Arc; - - use crate::virtual_file::{IoBufferMut, owned_buffers_io}; - async { - let destination_file = Arc::new( - VirtualFile::create(dst_path, ctx) - .await - .with_context(|| { - format!("create a destination file for layer '{dst_path}'") - }) - .map_err(DownloadError::Other)?, - ); - - let mut download = storage - .download(src_path, &DownloadOpts::default(), cancel) - .await?; - - pausable_failpoint!("before-downloading-layer-stream-pausable"); - - let mut buffered = owned_buffers_io::write::BufferedWriter::::new( - destination_file, - || IoBufferMut::with_capacity(super::BUFFER_SIZE), - gate.enter().map_err(|_| DownloadError::Cancelled)?, - cancel.child_token(), - ctx, - tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path), - ); - - // TODO: use vectored write (writev) once supported by tokio-epoll-uring. - // There's chunks_vectored() on the stream. - let (bytes_amount, destination_file) = async { - while let Some(res) = - futures::StreamExt::next(&mut download.download_stream).await - { - let chunk = match res { - Ok(chunk) => chunk, - Err(e) => return Err(DownloadError::from(e)), - }; - buffered - .write_buffered_borrowed(&chunk, ctx) - .await - .map_err(|e| match e { - FlushTaskError::Cancelled => DownloadError::Cancelled, - })?; - } - let inner = buffered - .flush_and_into_inner(ctx) - .await - .map_err(|e| match e { - FlushTaskError::Cancelled => DownloadError::Cancelled, - })?; - Ok(inner) - } - .await?; - - // not using sync_data because it can lose file size update - destination_file - .sync_all() - .await - .maybe_fatal_err("download_object sync_all") - .with_context(|| format!("failed to fsync source file at {dst_path}")) - .map_err(DownloadError::Other)?; - - Ok(bytes_amount) - } - .await - } - }; - - // in case the download failed, clean up - match res { - Ok(bytes_amount) => Ok(bytes_amount), - Err(e) => { - if let Err(e) = tokio::fs::remove_file(dst_path).await { - if e.kind() != std::io::ErrorKind::NotFound { - on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}")); - } - } - Err(e) + // TODO: use vectored write (writev) once supported by tokio-epoll-uring. + // There's chunks_vectored() on the stream. + let (bytes_amount, destination_file) = async { + while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await { + let chunk = match res { + Ok(chunk) => chunk, + Err(e) => return Err(DownloadError::from(e)), + }; + buffered + .write_buffered_borrowed(&chunk, ctx) + .await + .map_err(|e| match e { + FlushTaskError::Cancelled => DownloadError::Cancelled, + })?; } + let inner = buffered.shutdown(ctx).await.map_err(|e| match e { + FlushTaskError::Cancelled => DownloadError::Cancelled, + })?; + Ok(inner) } + .await?; + + // not using sync_data because it can lose file size update + destination_file + .sync_all() + .await + .maybe_fatal_err("download_object sync_all") + .with_context(|| format!("failed to fsync source file at {dst_path}")) + .map_err(DownloadError::Other)?; + + Ok((bytes_amount, destination_file)) } const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 60cf7ac79e..4338c4d4cf 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -646,7 +646,7 @@ enum UpdateError { NoData, #[error("Insufficient local storage space")] NoSpace, - #[error("Failed to download")] + #[error("Failed to download: {0}")] DownloadError(DownloadError), #[error(transparent)] Deserialize(#[from] serde_json::Error), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 0654342a25..40d4cffef3 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -34,6 +34,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; @@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; @@ -74,6 +73,7 @@ use crate::tenant::vectored_blob_io::{ BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; +use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; @@ -288,19 +288,20 @@ impl DeltaLayer { key_start: Key, lsn_range: &Range, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // TempVirtualFile requires us to never reuse a filename while an old + // instance of TempVirtualFile created with that filename is not done dropping yet. + // So, we use a monotonic counter to disambiguate the filenames. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(tenant_shard_id, timeline_id) .join(format!( - "{}-XXX__{:016X}-{:016X}.{}.{}", + "{}-XXX__{:016X}-{:016X}.{:x}.{}", key_start, u64::from(lsn_range.start), u64::from(lsn_range.end), - rand_string, + filename_disambiguator, TEMP_FILE_SUFFIX, )) } @@ -421,7 +422,7 @@ impl DeltaLayerWriterInner { let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path, ctx).await?; + let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?); // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); @@ -515,22 +516,6 @@ impl DeltaLayerWriterInner { self, key_end: Key, ctx: &RequestContext, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { - let temp_path = self.path.clone(); - let result = self.finish0(key_end, ctx).await; - if let Err(ref e) = result { - tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); - if let Err(e) = std::fs::remove_file(&temp_path) { - tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); - } - } - result - } - - async fn finish0( - self, - key_end: Key, - ctx: &RequestContext, ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; @@ -598,6 +583,10 @@ impl DeltaLayerWriterInner { trace!("created delta layer {}", self.path); + // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction) + // keep the gate open also, so that it's safe for them to rename the file to its final destination. + file.disarm_into_inner(); + Ok((desc, self.path)) } } @@ -726,17 +715,6 @@ impl DeltaLayerWriter { } } -impl Drop for DeltaLayerWriter { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - // We want to remove the virtual file here, so it's fine to not - // having completely flushed unwritten data. - let vfile = inner.blob_writer.into_inner_no_flush(); - vfile.remove(); - } - } -} - #[derive(thiserror::Error, Debug)] pub enum RewriteSummaryError { #[error("magic mismatch")] @@ -1609,8 +1587,8 @@ pub(crate) mod test { use bytes::Bytes; use itertools::MinMaxResult; use pageserver_api::value::Value; - use rand::RngCore; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; + use rand::{Rng, RngCore}; use super::*; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8ee4cdee66..22b5f96d17 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -32,6 +32,7 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use bytes::Bytes; @@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; @@ -72,6 +71,7 @@ use crate::tenant::vectored_blob_io::{ BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; +use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; @@ -252,14 +252,18 @@ impl ImageLayer { tenant_shard_id: TenantShardId, fname: &ImageLayerName, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // TempVirtualFile requires us to never reuse a filename while an old + // instance of TempVirtualFile created with that filename is not done dropping yet. + // So, we use a monotonic counter to disambiguate the filenames. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(&tenant_shard_id, &timeline_id) - .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}")) + .join(format!( + "{fname}.{:x}.{TEMP_FILE_SUFFIX}", + filename_disambiguator + )) } /// @@ -773,7 +777,7 @@ impl ImageLayerWriterInner { }, ); trace!("creating image layer {}", path); - let mut file = { + let mut file = TempVirtualFile::new( VirtualFile::open_with_options( &path, virtual_file::OpenOptions::new() @@ -781,8 +785,9 @@ impl ImageLayerWriterInner { .create_new(true), ctx, ) - .await? - }; + .await?, + gate.enter()?, + ); // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); @@ -896,25 +901,6 @@ impl ImageLayerWriterInner { self, ctx: &RequestContext, end_key: Option, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { - let temp_path = self.path.clone(); - let result = self.finish0(ctx, end_key).await; - if let Err(ref e) = result { - tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); - if let Err(e) = std::fs::remove_file(&temp_path) { - tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); - } - } - result - } - - /// - /// Finish writing the image layer. - /// - async fn finish0( - self, - ctx: &RequestContext, - end_key: Option, ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; @@ -932,7 +918,7 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); }; - let mut file = self.blob_writer.into_inner(); + let mut file = self.blob_writer.into_inner(ctx).await?; // Write out the index file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) @@ -1000,6 +986,10 @@ impl ImageLayerWriterInner { trace!("created image layer {}", self.path); + // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction) + // keep the gate open also, so that it's safe for them to rename the file to its final destination. + file.disarm_into_inner(); + Ok((desc, self.path)) } } @@ -1125,14 +1115,6 @@ impl ImageLayerWriter { } } -impl Drop for ImageLayerWriter { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner().remove(); - } - } -} - pub struct ImageLayerIterator<'a> { image_layer: &'a ImageLayerInner, ctx: &'a RequestContext, diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 4eb968c340..e75500aa39 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -25,29 +25,31 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut}; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; -pub use pageserver_api::models::virtual_file as api; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; +use self::owned_buffers_io::write::OwnedAsyncWriter; use crate::assert_u64_eq_usize::UsizeIsU64; use crate::context::RequestContext; use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation}; use crate::page_cache::{PAGE_SZ, PageWriteGuard}; -pub(crate) mod io_engine; + +pub(crate) use api::IoMode; +pub(crate) use io_engine::IoEngineKind; pub use io_engine::{ FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test, io_engine_for_bench, }; -mod metadata; -mod open_options; -pub(crate) use api::IoMode; -pub(crate) use io_engine::IoEngineKind; pub(crate) use metadata::Metadata; pub(crate) use open_options::*; +pub use pageserver_api::models::virtual_file as api; +pub use temporary::TempVirtualFile; -use self::owned_buffers_io::write::OwnedAsyncWriter; - +pub(crate) mod io_engine; +mod metadata; +mod open_options; +mod temporary; pub(crate) mod owned_buffers_io { //! Abstractions for IO with owned buffers. //! diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f3ab2c285a..628f8d3afd 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,5 +1,4 @@ mod flush; -use std::sync::Arc; pub(crate) use flush::FlushControl; use flush::FlushHandle; @@ -41,7 +40,6 @@ pub trait OwnedAsyncWriter { // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput, // since we would avoid copying majority of the data into the internal buffer. pub struct BufferedWriter { - writer: Arc, /// Clone of the buffer that was last submitted to the flush loop. /// `None` if no flush request has been submitted, Some forever after. pub(super) maybe_flushed: Option>, @@ -72,7 +70,7 @@ where /// /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. pub fn new( - writer: Arc, + writer: W, buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -80,7 +78,6 @@ where flush_task_span: tracing::Span, ) -> Self { Self { - writer: writer.clone(), mutable: Some(buf_new()), maybe_flushed: None, flush_handle: FlushHandle::spawn_new( @@ -95,10 +92,6 @@ where } } - pub fn as_inner(&self) -> &W { - &self.writer - } - /// Returns the number of bytes submitted to the background flush task. pub fn bytes_submitted(&self) -> u64 { self.bytes_submitted @@ -116,20 +109,16 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn flush_and_into_inner( - mut self, - ctx: &RequestContext, - ) -> Result<(u64, Arc), FlushTaskError> { + pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> { self.flush(ctx).await?; let Self { mutable: buf, maybe_flushed: _, - writer, mut flush_handle, bytes_submitted: bytes_amount, } = self; - flush_handle.shutdown().await?; + let writer = flush_handle.shutdown().await?; assert!(buf.is_some()); Ok((bytes_amount, writer)) } @@ -329,7 +318,7 @@ mod tests { async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> { let ctx = test_ctx(); let ctx = &ctx; - let recorder = Arc::new(RecorderWriter::default()); + let recorder = RecorderWriter::default(); let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); let mut writer = BufferedWriter::<_, RecorderWriter>::new( @@ -350,7 +339,7 @@ mod tests { writer.write_buffered_borrowed(b"j", ctx).await?; writer.write_buffered_borrowed(b"klmno", ctx).await?; - let (_, recorder) = writer.flush_and_into_inner(ctx).await?; + let (_, recorder) = writer.shutdown(ctx).await?; assert_eq!( recorder.get_writes(), { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index c076ba0eca..a522b807f3 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,5 +1,4 @@ use std::ops::ControlFlow; -use std::sync::Arc; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info, info_span, warn}; @@ -21,7 +20,7 @@ pub struct FlushHandleInner { /// and receives recyled buffer. channel: duplex::mpsc::Duplex, FullSlice>, /// Join handle for the background flush task. - join_handle: tokio::task::JoinHandle, FlushTaskError>>, + join_handle: tokio::task::JoinHandle>, } struct FlushRequest { @@ -120,7 +119,7 @@ where /// The queue depth is 1, and the passed-in `buf` seeds the queue depth. /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer. pub fn spawn_new( - file: Arc, + file: W, buf: B, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -183,7 +182,7 @@ where } /// Cleans up the channel, join the flush task. - pub async fn shutdown(&mut self) -> Result, FlushTaskError> { + pub async fn shutdown(&mut self) -> Result { let handle = self .inner .take() @@ -207,7 +206,7 @@ pub struct FlushBackgroundTask { /// and send back recycled buffer. channel: duplex::mpsc::Duplex, FlushRequest>, /// A writter for persisting data to disk. - writer: Arc, + writer: W, ctx: RequestContext, cancel: CancellationToken, /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk. @@ -228,7 +227,7 @@ where /// Creates a new background flush task. fn new( channel: duplex::mpsc::Duplex, FlushRequest>, - file: Arc, + file: W, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, ctx: RequestContext, @@ -243,7 +242,7 @@ where } /// Runs the background flush task. - async fn run(mut self) -> Result, FlushTaskError> { + async fn run(mut self) -> Result { // Exit condition: channel is closed and there is no remaining buffer to be flushed while let Some(request) = self.channel.recv().await { #[cfg(test)] diff --git a/pageserver/src/virtual_file/temporary.rs b/pageserver/src/virtual_file/temporary.rs new file mode 100644 index 0000000000..c4af1f6b22 --- /dev/null +++ b/pageserver/src/virtual_file/temporary.rs @@ -0,0 +1,106 @@ +use tracing::error; +use utils::sync::gate::GateGuard; + +use crate::context::RequestContext; + +use super::{ + MaybeFatalIo, VirtualFile, + owned_buffers_io::{ + io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter, + }, +}; + +/// A wrapper around [`super::VirtualFile`] that deletes the file on drop. +/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`]. +#[derive(Debug)] +pub struct TempVirtualFile { + inner: Option, +} + +#[derive(Debug)] +struct Inner { + file: VirtualFile, + /// Gate guard is held on as long as we need to do operations in the path (delete on drop) + _gate_guard: GateGuard, +} + +impl OwnedAsyncWriter for TempVirtualFile { + fn write_all_at( + &self, + buf: FullSlice, + offset: u64, + ctx: &RequestContext, + ) -> impl std::future::Future, std::io::Result<()>)> + Send { + VirtualFile::write_all_at(self, buf, offset, ctx) + } +} + +impl Drop for TempVirtualFile { + fn drop(&mut self) { + let Some(Inner { file, _gate_guard }) = self.inner.take() else { + return; + }; + let path = file.path(); + if let Err(e) = + std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file") + { + error!(err=%e, path=%path, "failed to remove"); + } + drop(_gate_guard); + } +} + +impl std::ops::Deref for TempVirtualFile { + type Target = VirtualFile; + + fn deref(&self) -> &Self::Target { + &self + .inner + .as_ref() + .expect("only None after into_inner or drop") + .file + } +} + +impl std::ops::DerefMut for TempVirtualFile { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self + .inner + .as_mut() + .expect("only None after into_inner or drop") + .file + } +} + +impl TempVirtualFile { + /// The caller is responsible for ensuring that the path of `virtual_file` is not reused + /// until after this TempVirtualFile's `Drop` impl has completed. + /// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl. + /// The best way to do so is by using a monotonic counter as a disambiguator. + /// TODO: centralize this disambiguator pattern inside this struct. + /// => + pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self { + Self { + inner: Some(Inner { + file: virtual_file, + _gate_guard: gate_guard, + }), + } + } + + /// Dismantle this wrapper and return the underlying [`VirtualFile`]. + /// This disables auto-unlinking functionality that is the essence of this wrapper. + /// + /// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem + /// operations after calls to this functions are still gated by some other gate guard. + /// + /// TODO: + /// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent)) + /// => + pub fn disarm_into_inner(mut self) -> VirtualFile { + self.inner + .take() + .expect("only None after into_inner or drop, and we are into_inner, and we consume") + .file + } +}