diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index f7afaae068..f3307ed5a9 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1206,6 +1206,7 @@ pub(crate) enum StorageIoOperation { Seek, Fsync, Metadata, + SetLen, } impl StorageIoOperation { @@ -1220,6 +1221,7 @@ impl StorageIoOperation { StorageIoOperation::Seek => "seek", StorageIoOperation::Fsync => "fsync", StorageIoOperation::Metadata => "metadata", + StorageIoOperation::SetLen => "set_len", } } } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index c7de87e355..c59081c40e 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -32,12 +32,14 @@ use super::{ remote_tenant_manifest_prefix, remote_tenant_path, }; use crate::TEMP_FILE_SUFFIX; +use crate::assert_u64_eq_usize::UsizeIsU64; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id, }; use crate::tenant::Generation; +use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error}; @@ -241,12 +243,47 @@ async fn download_object( { let chunk = match res { Ok(chunk) => chunk, - Err(e) => return Err(e), + Err(e) => return Err(DownloadError::Other(anyhow::Error::new(e).context("download next chunk"))), }; - buffered.write_buffered_borrowed(&chunk, ctx).await?; + buffered.write_buffered_borrowed(&chunk, ctx) + .await + .context("write_buffered_borrowed") + .map_err(DownloadError::Other)?; } - let inner = buffered.shutdown(|_| None).await?; // TODO: if we leave handle_tail=None here, it means we'll cut off layers that aren't a tail sz multiple - Ok(inner) + let mut pad_amount = None; + let (bytes_amount, destination_file) = buffered + .shutdown(|mut buf| { + use crate::virtual_file::owned_buffers_io::write::Buffer; + + let len = buf.pending(); + let cap = buf.cap(); + + // pad zeros to the next io alignment requirement. + // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. + // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. + // Need to find a better API that allows writing the format we intend to. + let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; + pad_amount = Some(count); + buf.extend_with(0, count); + + Some(buf) + }) + .await + .context("buffered writer shutdown") + .map_err(DownloadError::Other)?; + + let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64(); + let set_len_arg = bytes_amount - pad_amount; + destination_file + .set_len(set_len_arg) + .await + .maybe_fatal_err("download_object set_len") + .with_context(|| { + format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}") + }) + .map_err(DownloadError::Other)?; + + Ok((set_len_arg, destination_file)) } .await?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 63b1fa2189..63c1ccd4c8 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -185,6 +185,10 @@ impl VirtualFile { self.inner.sync_data().await } + pub async fn set_len(&self, len: u64) -> Result<(), Error> { + self.inner.set_len(len).await + } + pub async fn metadata(&self) -> Result { self.inner.metadata().await } @@ -669,6 +673,13 @@ impl VirtualFileInner { }) } + pub async fn set_len(&self, len: u64) -> Result<(), Error> { + with_file!(self, StorageIoOperation::SetLen, |file_guard| { + let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await; + res.maybe_fatal_err("set_len") + }) + } + /// Helper function internal to `VirtualFile` that looks up the underlying File, /// opens it and evicts some other File if necessary. The passed parameter is /// assumed to be a function available for the physical `File`. diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 758dd6e377..b7be243357 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -209,6 +209,22 @@ impl IoEngine { } } } + + pub(super) async fn set_len( + &self, + file_guard: FileGuard, + len: u64, + ) -> (FileGuard, std::io::Result<()>) { + match self { + IoEngine::NotSet => panic!("not initialized"), + // TODO: ftruncate op for tokio-epoll-uring + IoEngine::StdFs | IoEngine::TokioEpollUring => { + let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); + (file_guard, res) + } + } + } + pub(super) async fn write_at( &self, file_guard: FileGuard,