diff --git a/Cargo.lock b/Cargo.lock index 46170717d6..ad1fc67219 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2339,12 +2339,12 @@ dependencies = [ "signal-hook", "storage_broker", "svg_fmt", - "tar", "tempfile", "tenant_size_model", "thiserror", "tokio", "tokio-postgres", + "tokio-tar", "tokio-util", "toml_edit", "tracing", @@ -3970,6 +3970,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.0" +source = "git+https://github.com/neondatabase/tokio-tar.git?rev=404df61437de0feef49ba2ccdbdd94eb8ad6e142#404df61437de0feef49ba2ccdbdd94eb8ad6e142" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-util" version = "0.7.4" diff --git a/libs/utils/src/postgres_backend_async.rs b/libs/utils/src/postgres_backend_async.rs index dc93131b61..de547c3242 100644 --- a/libs/utils/src/postgres_backend_async.rs +++ b/libs/utils/src/postgres_backend_async.rs @@ -5,7 +5,7 @@ use crate::postgres_backend::AuthType; use anyhow::{bail, Context, Result}; -use bytes::{Bytes, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use pq_proto::{BeMessage, FeMessage, FeStartupPacket}; use std::future::Future; use std::net::SocketAddr; @@ -114,7 +114,10 @@ impl AsyncRead for Stream { pub struct PostgresBackend { stream: Stream, + // Output buffer. c.f. BeMessage::write why we are using BytesMut here. + // The data between 0 and "current position" as tracked by the bytes::Buf + // implementation of BytesMut, have already been written. buf_out: BytesMut, pub state: ProtoState, @@ -174,10 +177,13 @@ impl PostgresBackend { } /// Flush output buffer into the socket. - pub async fn flush(&mut self) -> std::io::Result<&mut Self> { - self.stream.write_all(&self.buf_out).await?; + pub async fn flush(&mut self) -> std::io::Result<()> { + while self.buf_out.has_remaining() { + let bytes_written = self.stream.write(self.buf_out.chunk()).await?; + self.buf_out.advance(bytes_written); + } self.buf_out.clear(); - Ok(self) + Ok(()) } /// Write message into internal output buffer. @@ -186,6 +192,36 @@ impl PostgresBackend { Ok(self) } + /// Returns an AsyncWrite implementation that wraps all the data written + /// to it in CopyData messages, and writes them to the connection + /// + /// The caller is responsible for sending CopyOutResponse and CopyDone messages. + pub fn copyout_writer(&mut self) -> CopyDataWriter { + CopyDataWriter { pgb: self } + } + + /// A polling function that tries to write all the data from 'buf_out' to the + /// underlying stream. + fn poll_write_buf( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + while self.buf_out.has_remaining() { + match Pin::new(&mut self.stream).poll_write(cx, self.buf_out.chunk()) { + Poll::Ready(Ok(bytes_written)) => { + self.buf_out.advance(bytes_written); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + } + } + Poll::Ready(Ok(())) + } + + fn poll_flush(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_flush(cx) + } + // Wrapper for run_message_loop() that shuts down socket when we are done pub async fn run(mut self, handler: &mut impl Handler, shutdown_watcher: F) -> Result<()> where @@ -458,3 +494,64 @@ impl PostgresBackend { Ok(ProcessMsgResult::Continue) } } + +/// +/// A futures::AsyncWrite implementation that wraps all data written to it in CopyData +/// messages. +/// + +pub struct CopyDataWriter<'a> { + pgb: &'a mut PostgresBackend, +} + +impl<'a> AsyncWrite for CopyDataWriter<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + // It's not strictly required to flush between each message, but makes it easier + // to view in wireshark, and usually the messages that the callers write are + // decently-sized anyway. + match this.pgb.poll_write_buf(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + } + + // CopyData + // XXX: if the input is large, we should split it into multiple messages. + // Not sure what the threshold should be, but the ultimate hard limit is that + // the length cannot exceed u32. + this.pgb.write_message(&BeMessage::CopyData(buf))?; + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + match this.pgb.poll_write_buf(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + } + this.pgb.poll_flush(cx) + } + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + match this.pgb.poll_write_buf(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + } + this.pgb.poll_flush(cx) + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index cd12ee0cc9..c0f3c76c4e 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -49,7 +49,7 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "2.0" signal-hook = "0.3.10" svg_fmt = "0.4.1" -tar = "0.4.33" +tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } thiserror = "1.0" tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 36664e119e..e537048489 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,17 +13,22 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{BufMut, BytesMut}; use fail::fail_point; -use itertools::Itertools; use std::fmt::Write as FmtWrite; -use std::io; -use std::io::Write; use std::sync::Arc; use std::time::SystemTime; -use tar::{Builder, EntryType, Header}; +use tokio::io; +use tokio::io::AsyncWrite; use tracing::*; -use crate::task_mgr; -use crate::tenant::{with_ondemand_download, PageReconstructResult, Timeline}; +/// NB: This relies on a modified version of tokio_tar that does *not* write the +/// end-of-archive marker (1024 zero bytes), when the Builder struct is dropped +/// without explicitly calling 'finish' or 'into_inner'! +/// +/// See https://github.com/neondatabase/tokio-tar/pull/1 +/// +use tokio_tar::{Builder, EntryType, Header}; + +use crate::tenant::{with_ondemand_download, Timeline}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; @@ -39,14 +44,13 @@ use utils::lsn::Lsn; /// used for constructing tarball. pub struct Basebackup<'a, W> where - W: Write, + W: AsyncWrite + Send + Sync + Unpin, { - ar: Builder>, + ar: Builder<&'a mut W>, timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, full_backup: bool, - finished: bool, } // Create basebackup with non-rel data in it. @@ -59,10 +63,10 @@ where // to start the replication. impl<'a, W> Basebackup<'a, W> where - W: Write, + W: AsyncWrite + Send + Sync + Unpin, { pub fn new( - write: W, + write: &'a mut W, timeline: &'a Arc, req_lsn: Option, prev_lsn: Option, @@ -117,22 +121,21 @@ where ); Ok(Basebackup { - ar: Builder::new(AbortableWrite::new(write)), + ar: Builder::new_non_terminated(write), timeline, lsn: backup_lsn, prev_record_lsn: prev_lsn, full_backup, - finished: false, }) } - pub fn send_tarball(mut self) -> anyhow::Result<()> { + pub async fn send_tarball(mut self) -> anyhow::Result<()> { // TODO include checksum // Create pgdata subdirs structure for dir in PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(dir)?; - self.ar.append(&header, &mut io::empty())?; + self.ar.append(&header, &mut io::empty()).await?; } // Send empty config files. @@ -140,10 +143,10 @@ where if *filepath == "pg_hba.conf" { let data = PG_HBA.as_bytes(); let header = new_tar_header(filepath, data.len() as u64)?; - self.ar.append(&header, data)?; + self.ar.append(&header, data).await?; } else { let header = new_tar_header(filepath, 0)?; - self.ar.append(&header, &mut io::empty())?; + self.ar.append(&header, &mut io::empty()).await?; } } @@ -154,29 +157,30 @@ where SlruKind::MultiXactMembers, ] { for segno in - with_ondemand_download_sync(|| self.timeline.list_slru_segments(kind, self.lsn))? + with_ondemand_download(|| self.timeline.list_slru_segments(kind, self.lsn)).await? { - self.add_slru_segment(kind, segno)?; + self.add_slru_segment(kind, segno).await?; } } // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in - with_ondemand_download_sync(|| self.timeline.list_dbdirs(self.lsn))? + with_ondemand_download(|| self.timeline.list_dbdirs(self.lsn)).await? { - self.add_dbdir(spcnode, dbnode, has_relmap_file)?; + self.add_dbdir(spcnode, dbnode, has_relmap_file).await?; // Gather and send relational files in each database if full backup is requested. if self.full_backup { - for rel in with_ondemand_download_sync(|| { - self.timeline.list_rels(spcnode, dbnode, self.lsn) - })? { - self.add_rel(rel)?; + for rel in + with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn)) + .await? + { + self.add_rel(rel).await?; } } } - for xid in with_ondemand_download_sync(|| self.timeline.list_twophase_files(self.lsn))? { - self.add_twophase_file(xid)?; + for xid in with_ondemand_download(|| self.timeline.list_twophase_files(self.lsn)).await? { + self.add_twophase_file(xid).await?; } fail_point!("basebackup-before-control-file", |_| { @@ -184,36 +188,32 @@ where }); // Generate pg_control and bootstrap WAL segment. - self.add_pgcontrol_file()?; - self.ar.finish()?; - self.finished = true; + self.add_pgcontrol_file().await?; + self.ar.finish().await?; debug!("all tarred up!"); Ok(()) } - fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> { + async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> { let nblocks = - with_ondemand_download_sync(|| self.timeline.get_rel_size(tag, self.lsn, false))?; - - // Function that adds relation segment data to archive - let mut add_file = |segment_index, data: &Vec| -> anyhow::Result<()> { - let file_name = tag.to_segfile_name(segment_index as u32); - let header = new_tar_header(&file_name, data.len() as u64)?; - self.ar.append(&header, data.as_slice())?; - Ok(()) - }; + with_ondemand_download(|| self.timeline.get_rel_size(tag, self.lsn, false)).await?; // If the relation is empty, create an empty file if nblocks == 0 { - add_file(0, &vec![])?; + let file_name = tag.to_segfile_name(0); + let header = new_tar_header(&file_name, 0)?; + self.ar.append(&header, &mut io::empty()).await?; return Ok(()); } // Add a file for each chunk of blocks (aka segment) - let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize); - for (seg, blocks) in chunks.into_iter().enumerate() { + let mut startblk = 0; + let mut seg = 0; + while startblk < nblocks { + let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks); + let mut segment_data: Vec = vec![]; - for blknum in blocks { + for blknum in startblk..endblk { let img = self .timeline .get_rel_page_at_lsn(tag, blknum, self.lsn, false) @@ -221,7 +221,12 @@ where segment_data.extend_from_slice(&img[..]); } - add_file(seg, &segment_data)?; + let file_name = tag.to_segfile_name(seg as u32); + let header = new_tar_header(&file_name, segment_data.len() as u64)?; + self.ar.append(&header, segment_data.as_slice()).await?; + + seg += 1; + startblk = endblk; } Ok(()) @@ -230,17 +235,18 @@ where // // Generate SLRU segment files from repository. // - fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { - let nblocks = with_ondemand_download_sync(|| { - self.timeline.get_slru_segment_size(slru, segno, self.lsn) - })?; + async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { + let nblocks = + with_ondemand_download(|| self.timeline.get_slru_segment_size(slru, segno, self.lsn)) + .await?; let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); for blknum in 0..nblocks { - let img = with_ondemand_download_sync(|| { + let img = with_ondemand_download(|| { self.timeline .get_slru_page_at_lsn(slru, segno, blknum, self.lsn) - })?; + }) + .await?; if slru == SlruKind::Clog { ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8); @@ -253,7 +259,7 @@ where let segname = format!("{}/{:>04X}", slru.to_str(), segno); let header = new_tar_header(&segname, slru_buf.len() as u64)?; - self.ar.append(&header, slru_buf.as_slice())?; + self.ar.append(&header, slru_buf.as_slice()).await?; trace!("Added to basebackup slru {} relsize {}", segname, nblocks); Ok(()) @@ -265,16 +271,16 @@ where // Each directory contains a PG_VERSION file, and the default database // directories also contain pg_filenode.map files. // - fn add_dbdir( + async fn add_dbdir( &mut self, spcnode: u32, dbnode: u32, has_relmap_file: bool, ) -> anyhow::Result<()> { let relmap_img = if has_relmap_file { - let img = with_ondemand_download_sync(|| { - self.timeline.get_relmap_file(spcnode, dbnode, self.lsn) - })?; + let img = + with_ondemand_download(|| self.timeline.get_relmap_file(spcnode, dbnode, self.lsn)) + .await?; ensure!(img.len() == 512); Some(img) } else { @@ -284,14 +290,14 @@ where if spcnode == GLOBALTABLESPACE_OID { let pg_version_str = self.timeline.pg_version.to_string(); let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?; - self.ar.append(&header, pg_version_str.as_bytes())?; + self.ar.append(&header, pg_version_str.as_bytes()).await?; info!("timeline.pg_version {}", self.timeline.pg_version); if let Some(img) = relmap_img { // filenode map for global tablespace let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?; - self.ar.append(&header, &img[..])?; + self.ar.append(&header, &img[..]).await?; } else { warn!("global/pg_filenode.map is missing"); } @@ -321,18 +327,18 @@ where // Append dir path for each database let path = format!("base/{}", dbnode); let header = new_tar_header_dir(&path)?; - self.ar.append(&header, &mut io::empty())?; + self.ar.append(&header, &mut io::empty()).await?; if let Some(img) = relmap_img { let dst_path = format!("base/{}/PG_VERSION", dbnode); let pg_version_str = self.timeline.pg_version.to_string(); let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?; - self.ar.append(&header, pg_version_str.as_bytes())?; + self.ar.append(&header, pg_version_str.as_bytes()).await?; let relmap_path = format!("base/{}/pg_filenode.map", dbnode); let header = new_tar_header(&relmap_path, img.len() as u64)?; - self.ar.append(&header, &img[..])?; + self.ar.append(&header, &img[..]).await?; } }; Ok(()) @@ -341,8 +347,8 @@ where // // Extract twophase state files // - fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { - let img = with_ondemand_download_sync(|| self.timeline.get_twophase_file(xid, self.lsn))?; + async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { + let img = with_ondemand_download(|| self.timeline.get_twophase_file(xid, self.lsn)).await?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); @@ -350,7 +356,7 @@ where buf.put_u32_le(crc); let path = format!("pg_twophase/{:>08X}", xid); let header = new_tar_header(&path, buf.len() as u64)?; - self.ar.append(&header, &buf[..])?; + self.ar.append(&header, &buf[..]).await?; Ok(()) } @@ -359,7 +365,7 @@ where // Add generated pg_control file and bootstrap WAL segment. // Also send zenith.signal file with extra bootstrap data. // - fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { + async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { // add zenith.signal file let mut zenith_signal = String::new(); if self.prev_record_lsn == Lsn(0) { @@ -371,17 +377,19 @@ where } else { write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; } - self.ar.append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - )?; + self.ar + .append( + &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, + zenith_signal.as_bytes(), + ) + .await?; - let checkpoint_bytes = - with_ondemand_download_sync(|| self.timeline.get_checkpoint(self.lsn)) - .context("failed to get checkpoint bytes")?; - let pg_control_bytes = - with_ondemand_download_sync(|| self.timeline.get_control_file(self.lsn)) - .context("failed get control bytes")?; + let checkpoint_bytes = with_ondemand_download(|| self.timeline.get_checkpoint(self.lsn)) + .await + .context("failed to get checkpoint bytes")?; + let pg_control_bytes = with_ondemand_download(|| self.timeline.get_control_file(self.lsn)) + .await + .context("failed get control bytes")?; let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control( &pg_control_bytes, @@ -392,7 +400,7 @@ where //send pg_control let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; - self.ar.append(&header, &pg_control_bytes[..])?; + self.ar.append(&header, &pg_control_bytes[..]).await?; //send wal segment let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE); @@ -404,24 +412,11 @@ where postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version) .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; ensure!(wal_seg.len() == WAL_SEGMENT_SIZE); - self.ar.append(&header, &wal_seg[..])?; + self.ar.append(&header, &wal_seg[..]).await?; Ok(()) } } -impl<'a, W> Drop for Basebackup<'a, W> -where - W: Write, -{ - /// If the basebackup was not finished, prevent the Archive::drop() from - /// writing the end-of-archive marker. - fn drop(&mut self) { - if !self.finished { - self.ar.get_mut().abort(); - } - } -} - // // Create new tarball entry header // @@ -457,57 +452,3 @@ fn new_tar_header_dir(path: &str) -> anyhow::Result
{ header.set_cksum(); Ok(header) } - -/// A wrapper that passes through all data to the underlying Write, -/// until abort() is called. -/// -/// tar::Builder has an annoying habit of finishing the archive with -/// a valid tar end-of-archive marker (two 512-byte sectors of zeros), -/// even if an error occurs and we don't finish building the archive. -/// We'd rather abort writing the tarball immediately than construct -/// a seemingly valid but incomplete archive. This wrapper allows us -/// to swallow the end-of-archive marker that Builder::drop() emits, -/// without writing it to the underlying sink. -/// -struct AbortableWrite { - w: W, - aborted: bool, -} - -impl AbortableWrite { - pub fn new(w: W) -> Self { - AbortableWrite { w, aborted: false } - } - - pub fn abort(&mut self) { - self.aborted = true; - } -} - -impl Write for AbortableWrite -where - W: Write, -{ - fn write(&mut self, data: &[u8]) -> io::Result { - if self.aborted { - Ok(data.len()) - } else { - self.w.write(data) - } - } - fn flush(&mut self) -> io::Result<()> { - if self.aborted { - Ok(()) - } else { - self.w.flush() - } - } -} - -fn with_ondemand_download_sync(f: F) -> anyhow::Result -where - F: Send + Fn() -> PageReconstructResult, - T: Send, -{ - task_mgr::COMPUTE_REQUEST_RUNTIME.block_on(with_ondemand_download(f)) -} diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 588b92c13f..bac27f69de 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -2,12 +2,13 @@ //! Import data and WAL from a PostgreSQL data directory and WAL segments into //! a neon Timeline. //! -use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; +use futures::StreamExt; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_tar::Archive; use tracing::*; use walkdir::WalkDir; @@ -42,7 +43,7 @@ pub fn get_lsn_from_controlfile(path: &Path) -> Result { /// This is currently only used to import a cluster freshly created by initdb. /// The code that deals with the checkpoint would not work right if the /// cluster was not shut down cleanly. -pub fn import_timeline_from_postgres_datadir( +pub async fn import_timeline_from_postgres_datadir( tline: &Timeline, pgdata_path: &Path, pgdata_lsn: Lsn, @@ -65,9 +66,11 @@ pub fn import_timeline_from_postgres_datadir( let absolute_path = entry.path(); let relative_path = absolute_path.strip_prefix(pgdata_path)?; - let file = File::open(absolute_path)?; + let mut file = tokio::fs::File::open(absolute_path).await?; let len = metadata.len() as usize; - if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { + if let Some(control_file) = + import_file(&mut modification, relative_path, &mut file, len).await? + { pg_control = Some(control_file); } modification.flush()?; @@ -102,12 +105,12 @@ pub fn import_timeline_from_postgres_datadir( } // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. -fn import_rel( - modification: &mut DatadirModification, +async fn import_rel( + modification: &mut DatadirModification<'_>, path: &Path, spcoid: Oid, dboid: Oid, - mut reader: Reader, + reader: &mut (impl AsyncRead + Send + Sync + Unpin), len: usize, ) -> anyhow::Result<()> { // Does it look like a relation file? @@ -148,7 +151,7 @@ fn import_rel( } loop { - let r = reader.read_exact(&mut buf); + let r = reader.read_exact(&mut buf).await; match r { Ok(_) => { modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?; @@ -181,11 +184,11 @@ fn import_rel( /// Import an SLRU segment file /// -fn import_slru( - modification: &mut DatadirModification, +async fn import_slru( + modification: &mut DatadirModification<'_>, slru: SlruKind, path: &Path, - mut reader: Reader, + reader: &mut (impl AsyncRead + Send + Sync + Unpin), len: usize, ) -> anyhow::Result<()> { info!("importing slru file {path:?}"); @@ -206,7 +209,7 @@ fn import_slru( let mut rpageno = 0; loop { - let r = reader.read_exact(&mut buf); + let r = reader.read_exact(&mut buf).await; match r { Ok(_) => { modification.put_slru_page_image( @@ -243,6 +246,7 @@ fn import_wal( startpoint: Lsn, endpoint: Lsn, ) -> anyhow::Result<()> { + use std::io::Read; let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version); let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE); @@ -265,10 +269,11 @@ fn import_wal( } // Slurp the WAL file - let mut file = File::open(&path)?; + let mut file = std::fs::File::open(&path)?; if offset > 0 { - file.seek(SeekFrom::Start(offset as u64))?; + use std::io::Seek; + file.seek(std::io::SeekFrom::Start(offset as u64))?; } let nread = file.read_to_end(&mut buf)?; @@ -310,9 +315,9 @@ fn import_wal( Ok(()) } -pub fn import_basebackup_from_tar( +pub async fn import_basebackup_from_tar( tline: &Timeline, - reader: Reader, + reader: &mut (impl AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, ) -> Result<()> { info!("importing base at {base_lsn}"); @@ -322,21 +327,24 @@ pub fn import_basebackup_from_tar( let mut pg_control: Option = None; // Import base - for base_tar_entry in tar::Archive::new(reader).entries()? { - let entry = base_tar_entry?; + let mut entries = Archive::new(reader).entries()?; + while let Some(base_tar_entry) = entries.next().await { + let mut entry = base_tar_entry?; let header = entry.header(); let len = header.entry_size()? as usize; let file_path = header.path()?.into_owned(); match header.entry_type() { - tar::EntryType::Regular => { - if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? { + tokio_tar::EntryType::Regular => { + if let Some(res) = + import_file(&mut modification, file_path.as_ref(), &mut entry, len).await? + { // We found the pg_control file. pg_control = Some(res); } modification.flush()?; } - tar::EntryType::Directory => { + tokio_tar::EntryType::Directory => { debug!("directory {:?}", file_path); } _ => { @@ -356,9 +364,9 @@ pub fn import_basebackup_from_tar( Ok(()) } -pub fn import_wal_from_tar( +pub async fn import_wal_from_tar( tline: &Timeline, - reader: Reader, + reader: &mut (impl AsyncRead + Send + Sync + Unpin), start_lsn: Lsn, end_lsn: Lsn, ) -> Result<()> { @@ -371,16 +379,19 @@ pub fn import_wal_from_tar( // Ingest wal until end_lsn info!("importing wal until {}", end_lsn); - let mut pg_wal_tar = tar::Archive::new(reader); - let mut pg_wal_entries_iter = pg_wal_tar.entries()?; + let mut pg_wal_tar = Archive::new(reader); + let mut pg_wal_entries = pg_wal_tar.entries()?; while last_lsn <= end_lsn { let bytes = { - let entry = pg_wal_entries_iter.next().expect("expected more wal")?; + let mut entry = pg_wal_entries + .next() + .await + .ok_or_else(|| anyhow::anyhow!("expected more wal"))??; let header = entry.header(); let file_path = header.path()?.into_owned(); match header.entry_type() { - tar::EntryType::Regular => { + tokio_tar::EntryType::Regular => { // FIXME: assume postgresql tli 1 for now let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE); let file_name = file_path @@ -390,9 +401,9 @@ pub fn import_wal_from_tar( ensure!(expected_filename == file_name); debug!("processing wal file {:?}", file_path); - read_all_bytes(entry)? + read_all_bytes(&mut entry).await? } - tar::EntryType::Directory => { + tokio_tar::EntryType::Directory => { debug!("directory {:?}", file_path); continue; } @@ -433,7 +444,7 @@ pub fn import_wal_from_tar( } // Log any extra unused files - for e in &mut pg_wal_entries_iter { + while let Some(e) = pg_wal_entries.next().await { let entry = e?; let header = entry.header(); let file_path = header.path()?.into_owned(); @@ -443,10 +454,10 @@ pub fn import_wal_from_tar( Ok(()) } -fn import_file( - modification: &mut DatadirModification, +async fn import_file( + modification: &mut DatadirModification<'_>, file_path: &Path, - reader: Reader, + reader: &mut (impl AsyncRead + Send + Sync + Unpin), len: usize, ) -> Result> { let file_name = match file_path.file_name() { @@ -466,7 +477,7 @@ fn import_file( match file_name.as_ref() { "pg_control" => { - let bytes = read_all_bytes(reader)?; + let bytes = read_all_bytes(reader).await?; // Extract the checkpoint record and import it separately. let pg_control = ControlFileData::decode(&bytes[..])?; @@ -479,7 +490,7 @@ fn import_file( return Ok(Some(pg_control)); } "pg_filenode.map" => { - let bytes = read_all_bytes(reader)?; + let bytes = read_all_bytes(reader).await?; modification.put_relmap_file(spcnode, dbnode, bytes)?; debug!("imported relmap file") } @@ -487,7 +498,7 @@ fn import_file( debug!("ignored PG_VERSION file"); } _ => { - import_rel(modification, file_path, spcnode, dbnode, reader, len)?; + import_rel(modification, file_path, spcnode, dbnode, reader, len).await?; debug!("imported rel creation"); } } @@ -502,7 +513,7 @@ fn import_file( match file_name.as_ref() { "pg_filenode.map" => { - let bytes = read_all_bytes(reader)?; + let bytes = read_all_bytes(reader).await?; modification.put_relmap_file(spcnode, dbnode, bytes)?; debug!("imported relmap file") } @@ -510,36 +521,36 @@ fn import_file( debug!("ignored PG_VERSION file"); } _ => { - import_rel(modification, file_path, spcnode, dbnode, reader, len)?; + import_rel(modification, file_path, spcnode, dbnode, reader, len).await?; debug!("imported rel creation"); } } } else if file_path.starts_with("pg_xact") { let slru = SlruKind::Clog; - import_slru(modification, slru, file_path, reader, len)?; + import_slru(modification, slru, file_path, reader, len).await?; debug!("imported clog slru"); } else if file_path.starts_with("pg_multixact/offsets") { let slru = SlruKind::MultiXactOffsets; - import_slru(modification, slru, file_path, reader, len)?; + import_slru(modification, slru, file_path, reader, len).await?; debug!("imported multixact offsets slru"); } else if file_path.starts_with("pg_multixact/members") { let slru = SlruKind::MultiXactMembers; - import_slru(modification, slru, file_path, reader, len)?; + import_slru(modification, slru, file_path, reader, len).await?; debug!("imported multixact members slru"); } else if file_path.starts_with("pg_twophase") { let xid = u32::from_str_radix(file_name.as_ref(), 16)?; - let bytes = read_all_bytes(reader)?; + let bytes = read_all_bytes(reader).await?; modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?; debug!("imported twophase file"); } else if file_path.starts_with("pg_wal") { debug!("found wal file in base section. ignore it"); } else if file_path.starts_with("zenith.signal") { // Parse zenith signal file to set correct previous LSN - let bytes = read_all_bytes(reader)?; + let bytes = read_all_bytes(reader).await?; // zenith.signal format is "PREV LSN: prev_lsn" // TODO write serialization and deserialization in the same place. let zenith_signal = std::str::from_utf8(&bytes)?.trim(); @@ -576,8 +587,8 @@ fn import_file( Ok(None) } -fn read_all_bytes(mut reader: Reader) -> Result { +async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result { let mut buf: Vec = vec![]; - reader.read_to_end(&mut buf)?; + reader.read_to_end(&mut buf).await?; Ok(Bytes::copy_from_slice(&buf[..])) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b84b2694f4..5393fca780 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -26,9 +26,6 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use tokio::pin; -use tokio_util::io::StreamReader; -use tokio_util::io::SyncIoBridge; use tracing::*; use utils::id::ConnectionId; use utils::{ @@ -395,9 +392,7 @@ impl PageServerHandler { pgb.write_message(&BeMessage::CopyInResponse)?; pgb.flush().await?; - let copyin_stream = copyin_stream(pgb); - pin!(copyin_stream); - + let mut copyin_stream = Box::pin(copyin_stream(pgb)); timeline .import_basebackup_from_tar(&mut copyin_stream, base_lsn) .await?; @@ -443,8 +438,8 @@ impl PageServerHandler { pgb.write_message(&BeMessage::CopyInResponse)?; pgb.flush().await?; let mut copyin_stream = Box::pin(copyin_stream(pgb)); - let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream)); - tokio::task::block_in_place(|| import_wal_from_tar(&timeline, reader, start_lsn, end_lsn))?; + let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream); + import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn).await?; info!("wal import complete"); // Drain the rest of the Copy data @@ -649,16 +644,14 @@ impl PageServerHandler { pgb.flush().await?; /* Send a tarball of the latest layer on the timeline */ - let mut writer = CopyDataSink { - pgb, - rt: tokio::runtime::Handle::current(), - }; - tokio::task::block_in_place(|| { + { + let mut writer = pgb.copyout_writer(); let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, prev_lsn, full_backup)?; tracing::Span::current().record("lsn", basebackup.lsn.to_string().as_str()); - basebackup.send_tarball() - })?; + basebackup.send_tarball().await?; + } + pgb.write_message(&BeMessage::CopyDone)?; pgb.flush().await?; info!("basebackup complete"); @@ -966,32 +959,3 @@ async fn get_active_timeline_with_timeout( .await .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } - -/// -/// A std::io::Write implementation that wraps all data written to it in CopyData -/// messages. -/// -struct CopyDataSink<'a> { - pgb: &'a mut PostgresBackend, - rt: tokio::runtime::Handle, -} - -impl<'a> io::Write for CopyDataSink<'a> { - fn write(&mut self, data: &[u8]) -> io::Result { - // CopyData - // FIXME: if the input is large, we should split it into multiple messages. - // Not sure what the threshold should be, but the ultimate hard limit is that - // the length cannot exceed u32. - // FIXME: flush isn't really required, but makes it easier - // to view in wireshark - self.pgb.write_message(&BeMessage::CopyData(data))?; - self.rt.block_on(self.pgb.flush())?; - trace!("CopyData sent for {} bytes!", data.len()); - - Ok(data.len()) - } - fn flush(&mut self) -> io::Result<()> { - // no-op - Ok(()) - } -} diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4c93490177..dcaa8ea268 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -18,8 +18,6 @@ use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use tokio::sync::watch; -use tokio_util::io::StreamReader; -use tokio_util::io::SyncIoBridge; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -36,7 +34,6 @@ use std::io::Write; use std::ops::Bound::Included; use std::path::Path; use std::path::PathBuf; -use std::pin::Pin; use std::process::Command; use std::process::Stdio; use std::sync::Arc; @@ -236,21 +233,15 @@ impl UninitializedTimeline<'_> { /// Prepares timeline data by loading it from the basebackup archive. pub async fn import_basebackup_from_tar( self, - mut copyin_stream: &mut Pin<&mut impl Stream>>, + copyin_stream: &mut (impl Stream> + Sync + Send + Unpin), base_lsn: Lsn, ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; - // import_basebackup_from_tar() is not async, mainly because the Tar crate - // it uses is not async. So we need to jump through some hoops: - // - convert the input from client connection to a synchronous Read - // - use block_in_place() - let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream)); - - tokio::task::block_in_place(|| { - import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn) - .context("Failed to import basebackup") - })?; + let mut reader = tokio_util::io::StreamReader::new(copyin_stream); + import_datadir::import_basebackup_from_tar(raw_timeline, &mut reader, base_lsn) + .await + .context("Failed to import basebackup")?; // Flush loop needs to be spawned in order to be able to flush. // We want to run proper checkpoint before we mark timeline as available to outside world @@ -2139,13 +2130,12 @@ impl Tenant { let tenant_id = raw_timeline.owning_tenant.tenant_id; let unfinished_timeline = raw_timeline.raw_timeline()?; - tokio::task::block_in_place(|| { - import_datadir::import_timeline_from_postgres_datadir( - unfinished_timeline, - pgdata_path, - pgdata_lsn, - ) - }) + import_datadir::import_timeline_from_postgres_datadir( + unfinished_timeline, + pgdata_path, + pgdata_lsn, + ) + .await .with_context(|| { format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}") })?; diff --git a/test_runner/regress/test_config.py b/test_runner/regress/test_config.py old mode 100644 new mode 100755