bootstrap_timeline: use tokio::fs for walingestion & use BufRead/BufWrite

This commit is contained in:
Christian Schwarz
2024-04-03 16:12:26 +00:00
parent 944313ffe1
commit 0be19f4a6d
2 changed files with 16 additions and 10 deletions

View File

@@ -22,12 +22,16 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&tarball)
.await
.with_context(|| format!("tempfile creation {tarball}"))?;
let buffered_file = tokio::io::BufWriter::with_capacity(
128 * 1024, /* TODO use BUFFER_SIZE, same with other constant */
file,
);
let mut paths = Vec::new();
for entry in WalkDir::new(path) {
let entry = entry?;
@@ -42,7 +46,7 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
// Do a sort to get a more consistent listing
paths.sort_unstable();
let zstd = ZstdEncoder::with_quality_and_params(
file,
buffered_file,
Level::Default,
&[CParameter::enable_long_distance_matching(true)],
);
@@ -60,7 +64,9 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
}
let mut zstd = builder.into_inner().await?;
zstd.shutdown().await?;
let mut compressed = zstd.into_inner();
let mut compressed_buffered: tokio::io::BufWriter<File> = zstd.into_inner();
compressed_buffered.flush().await?;
let mut compressed = compressed_buffered.into_inner();
let compressed_len = compressed.metadata().await?.len();
compressed.seek(SeekFrom::Start(0)).await?;
Ok((compressed, compressed_len))

View File

@@ -72,10 +72,11 @@ pub async fn import_timeline_from_postgres_datadir(
let absolute_path = entry.path();
let relative_path = absolute_path.strip_prefix(pgdata_path)?;
let mut file = tokio::fs::File::open(absolute_path).await?;
let file = tokio::fs::File::open(absolute_path).await?;
let mut bufread = tokio::io::BufReader::with_capacity(128 * 1024, file);
let len = metadata.len() as usize;
if let Some(control_file) =
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
import_file(&mut modification, relative_path, &mut bufread, len, ctx).await?
{
pg_control = Some(control_file);
}
@@ -288,15 +289,14 @@ async fn import_wal(
}
// Slurp the WAL file
let mut file = std::fs::File::open(&path)?;
let mut file = tokio::fs::File::open(&path).await?;
if offset > 0 {
use std::io::Seek;
file.seek(std::io::SeekFrom::Start(offset as u64))?;
use tokio::io::AsyncSeekExt;
file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
}
use std::io::Read;
let nread = file.read_to_end(&mut buf)?;
let nread = file.read_to_end(&mut buf).await?;
if nread != WAL_SEGMENT_SIZE - offset {
// Maybe allow this for .partial files?
error!("read only {} bytes from WAL file", nread);