mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
fix download usage of buffered writer (using pad + set_len strategy)
this fixes tenant::timeline::tests::test_heatmap_generation
This commit is contained in:
@@ -1206,6 +1206,7 @@ pub(crate) enum StorageIoOperation {
|
||||
Seek,
|
||||
Fsync,
|
||||
Metadata,
|
||||
SetLen,
|
||||
}
|
||||
|
||||
impl StorageIoOperation {
|
||||
@@ -1220,6 +1221,7 @@ impl StorageIoOperation {
|
||||
StorageIoOperation::Seek => "seek",
|
||||
StorageIoOperation::Fsync => "fsync",
|
||||
StorageIoOperation::Metadata => "metadata",
|
||||
StorageIoOperation::SetLen => "set_len",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,12 +32,14 @@ use super::{
|
||||
remote_tenant_manifest_prefix, remote_tenant_path,
|
||||
};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
|
||||
@@ -241,12 +243,47 @@ async fn download_object(
|
||||
{
|
||||
let chunk = match res {
|
||||
Ok(chunk) => chunk,
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return Err(DownloadError::Other(anyhow::Error::new(e).context("download next chunk"))),
|
||||
};
|
||||
buffered.write_buffered_borrowed(&chunk, ctx).await?;
|
||||
buffered.write_buffered_borrowed(&chunk, ctx)
|
||||
.await
|
||||
.context("write_buffered_borrowed")
|
||||
.map_err(DownloadError::Other)?;
|
||||
}
|
||||
let inner = buffered.shutdown(|_| None).await?; // TODO: if we leave handle_tail=None here, it means we'll cut off layers that aren't a tail sz multiple
|
||||
Ok(inner)
|
||||
let mut pad_amount = None;
|
||||
let (bytes_amount, destination_file) = buffered
|
||||
.shutdown(|mut buf| {
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
|
||||
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
|
||||
// Need to find a better API that allows writing the format we intend to.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
pad_amount = Some(count);
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await
|
||||
.context("buffered writer shutdown")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64();
|
||||
let set_len_arg = bytes_amount - pad_amount;
|
||||
destination_file
|
||||
.set_len(set_len_arg)
|
||||
.await
|
||||
.maybe_fatal_err("download_object set_len")
|
||||
.with_context(|| {
|
||||
format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((set_len_arg, destination_file))
|
||||
}
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -185,6 +185,10 @@ impl VirtualFile {
|
||||
self.inner.sync_data().await
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
|
||||
self.inner.set_len(len).await
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<Metadata, Error> {
|
||||
self.inner.metadata().await
|
||||
}
|
||||
@@ -669,6 +673,13 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
|
||||
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
|
||||
res.maybe_fatal_err("set_len")
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
/// opens it and evicts some other File if necessary. The passed parameter is
|
||||
/// assumed to be a function available for the physical `File`.
|
||||
|
||||
@@ -209,6 +209,22 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn set_len(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
len: u64,
|
||||
) -> (FileGuard, std::io::Result<()>) {
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
IoEngine::StdFs | IoEngine::TokioEpollUring => {
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
|
||||
Reference in New Issue
Block a user