From 0be19f4a6da8f23ded8510f1a9bf2f2c355665e2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 3 Apr 2024 16:12:26 +0000 Subject: [PATCH] bootstrap_timeline: use tokio::fs for walingestion & use BufRead/BufWrite --- libs/utils/src/zstd.rs | 12 +++++++++--- pageserver/src/import_datadir.rs | 14 +++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/libs/utils/src/zstd.rs b/libs/utils/src/zstd.rs index be2dcc00f5..06a53c6eef 100644 --- a/libs/utils/src/zstd.rs +++ b/libs/utils/src/zstd.rs @@ -22,12 +22,16 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<( let file = OpenOptions::new() .create(true) .truncate(true) - .read(true) .write(true) .open(&tarball) .await .with_context(|| format!("tempfile creation {tarball}"))?; + let buffered_file = tokio::io::BufWriter::with_capacity( + 128 * 1024, /* TODO use BUFFER_SIZE, same with other constant */ + file, + ); + let mut paths = Vec::new(); for entry in WalkDir::new(path) { let entry = entry?; @@ -42,7 +46,7 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<( // Do a sort to get a more consistent listing paths.sort_unstable(); let zstd = ZstdEncoder::with_quality_and_params( - file, + buffered_file, Level::Default, &[CParameter::enable_long_distance_matching(true)], ); @@ -60,7 +64,9 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<( } let mut zstd = builder.into_inner().await?; zstd.shutdown().await?; - let mut compressed = zstd.into_inner(); + let mut compressed_buffered: tokio::io::BufWriter = zstd.into_inner(); + compressed_buffered.flush().await?; + let mut compressed = compressed_buffered.into_inner(); let compressed_len = compressed.metadata().await?.len(); compressed.seek(SeekFrom::Start(0)).await?; Ok((compressed, compressed_len)) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 343dec2ca1..1a01dee073 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -72,10 +72,11 @@ pub async fn import_timeline_from_postgres_datadir( let absolute_path = entry.path(); let relative_path = absolute_path.strip_prefix(pgdata_path)?; - let mut file = tokio::fs::File::open(absolute_path).await?; + let file = tokio::fs::File::open(absolute_path).await?; + let mut bufread = tokio::io::BufReader::with_capacity(128 * 1024, file); let len = metadata.len() as usize; if let Some(control_file) = - import_file(&mut modification, relative_path, &mut file, len, ctx).await? + import_file(&mut modification, relative_path, &mut bufread, len, ctx).await? { pg_control = Some(control_file); } @@ -288,15 +289,14 @@ async fn import_wal( } // Slurp the WAL file - let mut file = std::fs::File::open(&path)?; + let mut file = tokio::fs::File::open(&path).await?; if offset > 0 { - use std::io::Seek; - file.seek(std::io::SeekFrom::Start(offset as u64))?; + use tokio::io::AsyncSeekExt; + file.seek(std::io::SeekFrom::Start(offset as u64)).await?; } - use std::io::Read; - let nread = file.read_to_end(&mut buf)?; + let nread = file.read_to_end(&mut buf).await?; if nread != WAL_SEGMENT_SIZE - offset { // Maybe allow this for .partial files? error!("read only {} bytes from WAL file", nread);