diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 452cd73f76..d95d75449d 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -2,9 +2,8 @@ //! Import data and WAL from a PostgreSQL data directory and WAL segments into //! a neon Timeline. //! +use std::io::SeekFrom; use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::task::{self, Poll}; use anyhow::{bail, ensure, Context, Result}; use async_compression::tokio::bufread::ZstdDecoder; @@ -13,7 +12,8 @@ use bytes::Bytes; use camino::Utf8Path; use futures::StreamExt; use nix::NixPath; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio_tar::Archive; use tokio_tar::Builder; use tokio_tar::HeaderMode; @@ -629,70 +629,16 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result Ok(Bytes::from(buf)) } -/// An in-memory buffer implementing `AsyncWrite`, inserting yields every now and then -/// -/// The number of yields is bounded by above by the number of times poll_write is called, -/// so calling it with 8 KB chunks and 8 MB chunks gives the same number of yields in total. -/// This is an explicit choice as the `YieldingVec` is meant to give the async executor -/// breathing room between units of CPU intensive preparation of buffers to be written. -/// Once a write call is issued, the whole buffer has been prepared already, so there is no -/// gain in splitting up the memcopy further. -struct YieldingVec { - yield_budget: usize, - // the buffer written into - buf: Vec, -} +pub async fn create_tar_zst(pgdata_path: &Utf8Path, tmp_path: &Utf8Path) -> Result<(File, u64)> { + let file = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&tmp_path) + .await + .with_context(|| format!("tempfile creation {tmp_path}"))?; -impl YieldingVec { - fn new() -> Self { - Self { - yield_budget: 0, - buf: Vec::new(), - } - } - // Whether we should yield for a read operation of given size - fn should_yield(&mut self, add_buf_len: usize) -> bool { - // Set this limit to a small value so that we are a - // good async citizen and yield repeatedly (but not - // too often for many small writes to cause many yields) - const YIELD_DIST: usize = 1024; - - let target_buf_len = self.buf.len() + add_buf_len; - let ret = self.yield_budget / YIELD_DIST < target_buf_len / YIELD_DIST; - if self.yield_budget < target_buf_len { - self.yield_budget += add_buf_len; - } - ret - } -} - -impl AsyncWrite for YieldingVec { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.should_yield(buf.len()) { - cx.waker().wake_by_ref(); - return Poll::Pending; - } - self.get_mut().buf.extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } -} - -pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result> { let mut paths = Vec::new(); for entry in WalkDir::new(pgdata_path) { let entry = entry?; @@ -707,7 +653,7 @@ pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result> { // Do a sort to get a more consistent listing paths.sort_unstable(); let zstd = ZstdEncoder::with_quality_and_params( - YieldingVec::new(), + file, Level::Default, &[CParameter::enable_long_distance_matching(true)], ); @@ -725,13 +671,14 @@ pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result> { } let mut zstd = builder.into_inner().await?; zstd.shutdown().await?; - let compressed = zstd.into_inner(); - let compressed_len = compressed.buf.len(); - const INITDB_TAR_ZST_WARN_LIMIT: usize = 2_000_000; + let mut compressed = zstd.into_inner(); + let compressed_len = compressed.metadata().await?.len(); + const INITDB_TAR_ZST_WARN_LIMIT: u64 = 2 * 1024 * 1024; if compressed_len > INITDB_TAR_ZST_WARN_LIMIT { warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}."); } - Ok(compressed.buf) + compressed.seek(SeekFrom::Start(0)).await?; + Ok((compressed, compressed_len)) } pub async fn extract_tar_zst( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d756e13316..48f71d7747 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12,7 +12,6 @@ //! use anyhow::{bail, Context}; -use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use enumset::EnumSet; use futures::stream::FuturesUnordered; @@ -69,6 +68,7 @@ use crate::tenant::config::TenantConfOpt; use crate::tenant::metadata::load_metadata; pub use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart; +use crate::tenant::remote_timeline_client::INITDB_PATH; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::InitializationOrder; @@ -2949,10 +2949,10 @@ impl Tenant { }; // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. + + let timelines_path = self.conf.timelines_path(&self.tenant_shard_id); let pgdata_path = path_with_suffix_extension( - self.conf - .timelines_path(&self.tenant_shard_id) - .join(format!("basebackup-{timeline_id}")), + timelines_path.join(format!("basebackup-{timeline_id}")), TEMP_FILE_SUFFIX, ); @@ -2983,31 +2983,43 @@ impl Tenant { ) .await .context("download initdb tar")?; - let buf_read = Box::pin(BufReader::new(initdb_tar_zst)); + let buf_read = + BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst); import_datadir::extract_tar_zst(&pgdata_path, buf_read) .await .context("extract initdb tar")?; - if initdb_tar_zst_path.exists() { - tokio::fs::remove_file(&initdb_tar_zst_path) - .await - .context("tempfile removal")?; - } + tokio::fs::remove_file(&initdb_tar_zst_path) + .await + .or_else(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + // If something else already removed the file, ignore the error + Ok(()) + } else { + Err(e) + } + }) + .with_context(|| format!("tempfile removal {initdb_tar_zst_path}"))?; } else { // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?; // Upload the created data dir to S3 if let Some(storage) = &self.remote_storage { - let pgdata_zstd = import_datadir::create_tar_zst(&pgdata_path).await?; - let pgdata_zstd = Bytes::from(pgdata_zstd); + let temp_path = timelines_path.join(format!( + "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}" + )); + + let (pgdata_zstd, tar_zst_size) = + import_datadir::create_tar_zst(&pgdata_path, &temp_path).await?; backoff::retry( || async { self::remote_timeline_client::upload_initdb_dir( storage, &self.tenant_shard_id.tenant_id, &timeline_id, - pgdata_zstd.clone(), + pgdata_zstd.try_clone().await?, + tar_zst_size, ) .await }, @@ -3019,6 +3031,18 @@ impl Tenant { backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await?; + + tokio::fs::remove_file(&temp_path) + .await + .or_else(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + // If something else already removed the file, ignore the error + Ok(()) + } else { + Err(e) + } + }) + .with_context(|| format!("tempfile removal {temp_path}"))?; } } let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1ef9fe4a64..03600cf5ae 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -255,7 +255,7 @@ pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3; pub(crate) const INITDB_PATH: &str = "initdb.tar.zst"; /// Default buffer size when interfacing with [`tokio::fs::File`]. -const BUFFER_SIZE: usize = 32 * 1024; +pub(crate) const BUFFER_SIZE: usize = 32 * 1024; pub enum MaybeDeletedIndexPart { IndexPart(IndexPart), diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index ce942b56f8..ed32c4eed9 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -402,7 +402,9 @@ pub(crate) async fn download_initdb_tar_zst( .with_context(|| format!("timeline dir creation {timeline_path}")) .map_err(DownloadError::Other)?; } - let temp_path = timeline_path.join(format!("{INITDB_PATH}-{timeline_id}.{TEMP_FILE_SUFFIX}")); + let temp_path = timeline_path.join(format!( + "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}" + )); let file = download_retry( || async { @@ -438,10 +440,10 @@ pub(crate) async fn download_initdb_tar_zst( ) .await .map_err(|e| { - if temp_path.exists() { - // Do a best-effort attempt at deleting the temporary file upon encountering an error. - // We don't have async here nor do we want to pile on any extra errors. - if let Err(e) = std::fs::remove_file(&temp_path) { + // Do a best-effort attempt at deleting the temporary file upon encountering an error. + // We don't have async here nor do we want to pile on any extra errors. + if let Err(e) = std::fs::remove_file(&temp_path) { + if e.kind() != std::io::ErrorKind::NotFound { warn!("error deleting temporary file {temp_path}: {e}"); } } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index e1dea3ab4b..d0744e7c83 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -1,12 +1,11 @@ //! Helper functions to upload files to remote storage with a RemoteStorage use anyhow::{bail, Context}; -use bytes::Bytes; use camino::Utf8Path; use fail::fail_point; use pageserver_api::shard::TenantShardId; use std::io::ErrorKind; -use tokio::fs; +use tokio::fs::{self, File}; use super::Generation; use crate::{ @@ -120,17 +119,16 @@ pub(crate) async fn upload_initdb_dir( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, - initdb_dir: Bytes, + initdb_tar_zst: File, + size: u64, ) -> anyhow::Result<()> { tracing::trace!("uploading initdb dir"); - let size = initdb_dir.len(); - - let bytes = futures::stream::once(futures::future::ready(Ok(initdb_dir))); + let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE); let remote_path = remote_initdb_archive_path(tenant_id, timeline_id); storage - .upload_storage_object(bytes, size, &remote_path) + .upload_storage_object(file, size as usize, &remote_path) .await .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'")) }