Replace 'tar' crate with 'tokio-tar' (#3202)

The synchronous 'tar' crate has required us to use block_in_place and
SyncIoBridge to work together with the async I/O in the client
connection. Switch to 'tokio-tar' crate that uses async I/O natively.

As part of this, move the CopyDataWriter implementation to
postgres_backend_async.rs. Even though it's only used in one place
currently, it's in principle generally applicable whenever you want to
use COPY out.

Unfortunately we cannot use the 'tokio-tar' as it is: the Builder
implementation requires the writer to have 'static lifetime. So we
have to use a modified version without that requirement. The 'static
lifetime was required just for the Drop implementation that writes
the end-of-archive sections if the Builder is dropped without calling
`finish`. But we don't actually want that behavior anyway; in fact
we had to jump through some hoops with the AbortableWrite hack to skip
those. With the modified version of 'tokio-tar' without that Drop
implementation, we don't need AbortableWrite either.

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
This commit is contained in:
Heikki Linnakangas
2023-01-03 12:39:11 +02:00
committed by GitHub
parent 5bc9f8eae0
commit 0a0e55c3d0
8 changed files with 278 additions and 261 deletions

16
Cargo.lock generated
View File

@@ -2339,12 +2339,12 @@ dependencies = [
"signal-hook",
"storage_broker",
"svg_fmt",
"tar",
"tempfile",
"tenant_size_model",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-tar",
"tokio-util",
"toml_edit",
"tracing",
@@ -3970,6 +3970,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tar"
version = "0.3.0"
source = "git+https://github.com/neondatabase/tokio-tar.git?rev=404df61437de0feef49ba2ccdbdd94eb8ad6e142#404df61437de0feef49ba2ccdbdd94eb8ad6e142"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall",
"tokio",
"tokio-stream",
"xattr",
]
[[package]]
name = "tokio-util"
version = "0.7.4"

View File

@@ -5,7 +5,7 @@
use crate::postgres_backend::AuthType;
use anyhow::{bail, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use std::future::Future;
use std::net::SocketAddr;
@@ -114,7 +114,10 @@ impl AsyncRead for Stream {
pub struct PostgresBackend {
stream: Stream,
// Output buffer. c.f. BeMessage::write why we are using BytesMut here.
// The data between 0 and "current position" as tracked by the bytes::Buf
// implementation of BytesMut, have already been written.
buf_out: BytesMut,
pub state: ProtoState,
@@ -174,10 +177,13 @@ impl PostgresBackend {
}
/// Flush output buffer into the socket.
pub async fn flush(&mut self) -> std::io::Result<&mut Self> {
self.stream.write_all(&self.buf_out).await?;
pub async fn flush(&mut self) -> std::io::Result<()> {
while self.buf_out.has_remaining() {
let bytes_written = self.stream.write(self.buf_out.chunk()).await?;
self.buf_out.advance(bytes_written);
}
self.buf_out.clear();
Ok(self)
Ok(())
}
/// Write message into internal output buffer.
@@ -186,6 +192,36 @@ impl PostgresBackend {
Ok(self)
}
/// Returns an AsyncWrite implementation that wraps all the data written
/// to it in CopyData messages, and writes them to the connection
///
/// The caller is responsible for sending CopyOutResponse and CopyDone messages.
pub fn copyout_writer(&mut self) -> CopyDataWriter {
CopyDataWriter { pgb: self }
}
/// A polling function that tries to write all the data from 'buf_out' to the
/// underlying stream.
fn poll_write_buf(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
while self.buf_out.has_remaining() {
match Pin::new(&mut self.stream).poll_write(cx, self.buf_out.chunk()) {
Poll::Ready(Ok(bytes_written)) => {
self.buf_out.advance(bytes_written);
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(()))
}
fn poll_flush(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
// Wrapper for run_message_loop() that shuts down socket when we are done
pub async fn run<F, S>(mut self, handler: &mut impl Handler, shutdown_watcher: F) -> Result<()>
where
@@ -458,3 +494,64 @@ impl PostgresBackend {
Ok(ProcessMsgResult::Continue)
}
}
///
/// A futures::AsyncWrite implementation that wraps all data written to it in CopyData
/// messages.
///
pub struct CopyDataWriter<'a> {
pgb: &'a mut PostgresBackend,
}
impl<'a> AsyncWrite for CopyDataWriter<'a> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
// It's not strictly required to flush between each message, but makes it easier
// to view in wireshark, and usually the messages that the callers write are
// decently-sized anyway.
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
}
// CopyData
// XXX: if the input is large, we should split it into multiple messages.
// Not sure what the threshold should be, but the ultimate hard limit is that
// the length cannot exceed u32.
this.pgb.write_message(&BeMessage::CopyData(buf))?;
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
}
this.pgb.poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
match this.pgb.poll_write_buf(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
}
this.pgb.poll_flush(cx)
}
}

View File

@@ -49,7 +49,7 @@ serde_json = { version = "1.0", features = ["raw_value"] }
serde_with = "2.0"
signal-hook = "0.3.10"
svg_fmt = "0.4.1"
tar = "0.4.33"
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
thiserror = "1.0"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }

View File

@@ -13,17 +13,22 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use itertools::Itertools;
use std::fmt::Write as FmtWrite;
use std::io;
use std::io::Write;
use std::sync::Arc;
use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tokio::io;
use tokio::io::AsyncWrite;
use tracing::*;
use crate::task_mgr;
use crate::tenant::{with_ondemand_download, PageReconstructResult, Timeline};
/// NB: This relies on a modified version of tokio_tar that does *not* write the
/// end-of-archive marker (1024 zero bytes), when the Builder struct is dropped
/// without explicitly calling 'finish' or 'into_inner'!
///
/// See https://github.com/neondatabase/tokio-tar/pull/1
///
use tokio_tar::{Builder, EntryType, Header};
use crate::tenant::{with_ondemand_download, Timeline};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
@@ -39,14 +44,13 @@ use utils::lsn::Lsn;
/// used for constructing tarball.
pub struct Basebackup<'a, W>
where
W: Write,
W: AsyncWrite + Send + Sync + Unpin,
{
ar: Builder<AbortableWrite<W>>,
ar: Builder<&'a mut W>,
timeline: &'a Arc<Timeline>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
finished: bool,
}
// Create basebackup with non-rel data in it.
@@ -59,10 +63,10 @@ where
// to start the replication.
impl<'a, W> Basebackup<'a, W>
where
W: Write,
W: AsyncWrite + Send + Sync + Unpin,
{
pub fn new(
write: W,
write: &'a mut W,
timeline: &'a Arc<Timeline>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
@@ -117,22 +121,21 @@ where
);
Ok(Basebackup {
ar: Builder::new(AbortableWrite::new(write)),
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
finished: false,
})
}
pub fn send_tarball(mut self) -> anyhow::Result<()> {
pub async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
// Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?;
self.ar.append(&header, &mut io::empty())?;
self.ar.append(&header, &mut io::empty()).await?;
}
// Send empty config files.
@@ -140,10 +143,10 @@ where
if *filepath == "pg_hba.conf" {
let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data)?;
self.ar.append(&header, data).await?;
} else {
let header = new_tar_header(filepath, 0)?;
self.ar.append(&header, &mut io::empty())?;
self.ar.append(&header, &mut io::empty()).await?;
}
}
@@ -154,29 +157,30 @@ where
SlruKind::MultiXactMembers,
] {
for segno in
with_ondemand_download_sync(|| self.timeline.list_slru_segments(kind, self.lsn))?
with_ondemand_download(|| self.timeline.list_slru_segments(kind, self.lsn)).await?
{
self.add_slru_segment(kind, segno)?;
self.add_slru_segment(kind, segno).await?;
}
}
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
with_ondemand_download_sync(|| self.timeline.list_dbdirs(self.lsn))?
with_ondemand_download(|| self.timeline.list_dbdirs(self.lsn)).await?
{
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in with_ondemand_download_sync(|| {
self.timeline.list_rels(spcnode, dbnode, self.lsn)
})? {
self.add_rel(rel)?;
for rel in
with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
.await?
{
self.add_rel(rel).await?;
}
}
}
for xid in with_ondemand_download_sync(|| self.timeline.list_twophase_files(self.lsn))? {
self.add_twophase_file(xid)?;
for xid in with_ondemand_download(|| self.timeline.list_twophase_files(self.lsn)).await? {
self.add_twophase_file(xid).await?;
}
fail_point!("basebackup-before-control-file", |_| {
@@ -184,36 +188,32 @@ where
});
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file()?;
self.ar.finish()?;
self.finished = true;
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
debug!("all tarred up!");
Ok(())
}
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download_sync(|| self.timeline.get_rel_size(tag, self.lsn, false))?;
// Function that adds relation segment data to archive
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
let file_name = tag.to_segfile_name(segment_index as u32);
let header = new_tar_header(&file_name, data.len() as u64)?;
self.ar.append(&header, data.as_slice())?;
Ok(())
};
with_ondemand_download(|| self.timeline.get_rel_size(tag, self.lsn, false)).await?;
// If the relation is empty, create an empty file
if nblocks == 0 {
add_file(0, &vec![])?;
let file_name = tag.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
return Ok(());
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut startblk = 0;
let mut seg = 0;
while startblk < nblocks {
let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
@@ -221,7 +221,12 @@ where
segment_data.extend_from_slice(&img[..]);
}
add_file(seg, &segment_data)?;
let file_name = tag.to_segfile_name(seg as u32);
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
self.ar.append(&header, segment_data.as_slice()).await?;
seg += 1;
startblk = endblk;
}
Ok(())
@@ -230,17 +235,18 @@ where
//
// Generate SLRU segment files from repository.
//
fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks = with_ondemand_download_sync(|| {
self.timeline.get_slru_segment_size(slru, segno, self.lsn)
})?;
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download(|| self.timeline.get_slru_segment_size(slru, segno, self.lsn))
.await?;
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
for blknum in 0..nblocks {
let img = with_ondemand_download_sync(|| {
let img = with_ondemand_download(|| {
self.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
})?;
})
.await?;
if slru == SlruKind::Clog {
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
@@ -253,7 +259,7 @@ where
let segname = format!("{}/{:>04X}", slru.to_str(), segno);
let header = new_tar_header(&segname, slru_buf.len() as u64)?;
self.ar.append(&header, slru_buf.as_slice())?;
self.ar.append(&header, slru_buf.as_slice()).await?;
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
Ok(())
@@ -265,16 +271,16 @@ where
// Each directory contains a PG_VERSION file, and the default database
// directories also contain pg_filenode.map files.
//
fn add_dbdir(
async fn add_dbdir(
&mut self,
spcnode: u32,
dbnode: u32,
has_relmap_file: bool,
) -> anyhow::Result<()> {
let relmap_img = if has_relmap_file {
let img = with_ondemand_download_sync(|| {
self.timeline.get_relmap_file(spcnode, dbnode, self.lsn)
})?;
let img =
with_ondemand_download(|| self.timeline.get_relmap_file(spcnode, dbnode, self.lsn))
.await?;
ensure!(img.len() == 512);
Some(img)
} else {
@@ -284,14 +290,14 @@ where
if spcnode == GLOBALTABLESPACE_OID {
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
self.ar.append(&header, pg_version_str.as_bytes()).await?;
info!("timeline.pg_version {}", self.timeline.pg_version);
if let Some(img) = relmap_img {
// filenode map for global tablespace
let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
self.ar.append(&header, &img[..])?;
self.ar.append(&header, &img[..]).await?;
} else {
warn!("global/pg_filenode.map is missing");
}
@@ -321,18 +327,18 @@ where
// Append dir path for each database
let path = format!("base/{}", dbnode);
let header = new_tar_header_dir(&path)?;
self.ar.append(&header, &mut io::empty())?;
self.ar.append(&header, &mut io::empty()).await?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
self.ar.append(&header, pg_version_str.as_bytes()).await?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar.append(&header, &img[..])?;
self.ar.append(&header, &img[..]).await?;
}
};
Ok(())
@@ -341,8 +347,8 @@ where
//
// Extract twophase state files
//
fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
let img = with_ondemand_download_sync(|| self.timeline.get_twophase_file(xid, self.lsn))?;
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
let img = with_ondemand_download(|| self.timeline.get_twophase_file(xid, self.lsn)).await?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
@@ -350,7 +356,7 @@ where
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
self.ar.append(&header, &buf[..]).await?;
Ok(())
}
@@ -359,7 +365,7 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -371,17 +377,19 @@ where
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
self.ar
.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)
.await?;
let checkpoint_bytes =
with_ondemand_download_sync(|| self.timeline.get_checkpoint(self.lsn))
.context("failed to get checkpoint bytes")?;
let pg_control_bytes =
with_ondemand_download_sync(|| self.timeline.get_control_file(self.lsn))
.context("failed get control bytes")?;
let checkpoint_bytes = with_ondemand_download(|| self.timeline.get_checkpoint(self.lsn))
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = with_ondemand_download(|| self.timeline.get_control_file(self.lsn))
.await
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
@@ -392,7 +400,7 @@ where
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;
self.ar.append(&header, &pg_control_bytes[..]).await?;
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
@@ -404,24 +412,11 @@ where
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?;
self.ar.append(&header, &wal_seg[..]).await?;
Ok(())
}
}
impl<'a, W> Drop for Basebackup<'a, W>
where
W: Write,
{
/// If the basebackup was not finished, prevent the Archive::drop() from
/// writing the end-of-archive marker.
fn drop(&mut self) {
if !self.finished {
self.ar.get_mut().abort();
}
}
}
//
// Create new tarball entry header
//
@@ -457,57 +452,3 @@ fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
header.set_cksum();
Ok(header)
}
/// A wrapper that passes through all data to the underlying Write,
/// until abort() is called.
///
/// tar::Builder has an annoying habit of finishing the archive with
/// a valid tar end-of-archive marker (two 512-byte sectors of zeros),
/// even if an error occurs and we don't finish building the archive.
/// We'd rather abort writing the tarball immediately than construct
/// a seemingly valid but incomplete archive. This wrapper allows us
/// to swallow the end-of-archive marker that Builder::drop() emits,
/// without writing it to the underlying sink.
///
struct AbortableWrite<W> {
w: W,
aborted: bool,
}
impl<W> AbortableWrite<W> {
pub fn new(w: W) -> Self {
AbortableWrite { w, aborted: false }
}
pub fn abort(&mut self) {
self.aborted = true;
}
}
impl<W> Write for AbortableWrite<W>
where
W: Write,
{
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
if self.aborted {
Ok(data.len())
} else {
self.w.write(data)
}
}
fn flush(&mut self) -> io::Result<()> {
if self.aborted {
Ok(())
} else {
self.w.flush()
}
}
}
fn with_ondemand_download_sync<F, T>(f: F) -> anyhow::Result<T>
where
F: Send + Fn() -> PageReconstructResult<T>,
T: Send,
{
task_mgr::COMPUTE_REQUEST_RUNTIME.block_on(with_ondemand_download(f))
}

View File

@@ -2,12 +2,13 @@
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
//! a neon Timeline.
//!
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_tar::Archive;
use tracing::*;
use walkdir::WalkDir;
@@ -42,7 +43,7 @@ pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir(
pub async fn import_timeline_from_postgres_datadir(
tline: &Timeline,
pgdata_path: &Path,
pgdata_lsn: Lsn,
@@ -65,9 +66,11 @@ pub fn import_timeline_from_postgres_datadir(
let absolute_path = entry.path();
let relative_path = absolute_path.strip_prefix(pgdata_path)?;
let file = File::open(absolute_path)?;
let mut file = tokio::fs::File::open(absolute_path).await?;
let len = metadata.len() as usize;
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
if let Some(control_file) =
import_file(&mut modification, relative_path, &mut file, len).await?
{
pg_control = Some(control_file);
}
modification.flush()?;
@@ -102,12 +105,12 @@ pub fn import_timeline_from_postgres_datadir(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_rel<Reader: Read>(
modification: &mut DatadirModification,
async fn import_rel(
modification: &mut DatadirModification<'_>,
path: &Path,
spcoid: Oid,
dboid: Oid,
mut reader: Reader,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
) -> anyhow::Result<()> {
// Does it look like a relation file?
@@ -148,7 +151,7 @@ fn import_rel<Reader: Read>(
}
loop {
let r = reader.read_exact(&mut buf);
let r = reader.read_exact(&mut buf).await;
match r {
Ok(_) => {
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
@@ -181,11 +184,11 @@ fn import_rel<Reader: Read>(
/// Import an SLRU segment file
///
fn import_slru<Reader: Read>(
modification: &mut DatadirModification,
async fn import_slru(
modification: &mut DatadirModification<'_>,
slru: SlruKind,
path: &Path,
mut reader: Reader,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
) -> anyhow::Result<()> {
info!("importing slru file {path:?}");
@@ -206,7 +209,7 @@ fn import_slru<Reader: Read>(
let mut rpageno = 0;
loop {
let r = reader.read_exact(&mut buf);
let r = reader.read_exact(&mut buf).await;
match r {
Ok(_) => {
modification.put_slru_page_image(
@@ -243,6 +246,7 @@ fn import_wal(
startpoint: Lsn,
endpoint: Lsn,
) -> anyhow::Result<()> {
use std::io::Read;
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
@@ -265,10 +269,11 @@ fn import_wal(
}
// Slurp the WAL file
let mut file = File::open(&path)?;
let mut file = std::fs::File::open(&path)?;
if offset > 0 {
file.seek(SeekFrom::Start(offset as u64))?;
use std::io::Seek;
file.seek(std::io::SeekFrom::Start(offset as u64))?;
}
let nread = file.read_to_end(&mut buf)?;
@@ -310,9 +315,9 @@ fn import_wal(
Ok(())
}
pub fn import_basebackup_from_tar<Reader: Read>(
pub async fn import_basebackup_from_tar(
tline: &Timeline,
reader: Reader,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {base_lsn}");
@@ -322,21 +327,24 @@ pub fn import_basebackup_from_tar<Reader: Read>(
let mut pg_control: Option<ControlFileData> = None;
// Import base
for base_tar_entry in tar::Archive::new(reader).entries()? {
let entry = base_tar_entry?;
let mut entries = Archive::new(reader).entries()?;
while let Some(base_tar_entry) = entries.next().await {
let mut entry = base_tar_entry?;
let header = entry.header();
let len = header.entry_size()? as usize;
let file_path = header.path()?.into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? {
tokio_tar::EntryType::Regular => {
if let Some(res) =
import_file(&mut modification, file_path.as_ref(), &mut entry, len).await?
{
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
}
tar::EntryType::Directory => {
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
}
_ => {
@@ -356,9 +364,9 @@ pub fn import_basebackup_from_tar<Reader: Read>(
Ok(())
}
pub fn import_wal_from_tar<Reader: Read>(
pub async fn import_wal_from_tar(
tline: &Timeline,
reader: Reader,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
@@ -371,16 +379,19 @@ pub fn import_wal_from_tar<Reader: Read>(
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
let mut pg_wal_tar = tar::Archive::new(reader);
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
let mut pg_wal_tar = Archive::new(reader);
let mut pg_wal_entries = pg_wal_tar.entries()?;
while last_lsn <= end_lsn {
let bytes = {
let entry = pg_wal_entries_iter.next().expect("expected more wal")?;
let mut entry = pg_wal_entries
.next()
.await
.ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
let header = entry.header();
let file_path = header.path()?.into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
tokio_tar::EntryType::Regular => {
// FIXME: assume postgresql tli 1 for now
let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
let file_name = file_path
@@ -390,9 +401,9 @@ pub fn import_wal_from_tar<Reader: Read>(
ensure!(expected_filename == file_name);
debug!("processing wal file {:?}", file_path);
read_all_bytes(entry)?
read_all_bytes(&mut entry).await?
}
tar::EntryType::Directory => {
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
continue;
}
@@ -433,7 +444,7 @@ pub fn import_wal_from_tar<Reader: Read>(
}
// Log any extra unused files
for e in &mut pg_wal_entries_iter {
while let Some(e) = pg_wal_entries.next().await {
let entry = e?;
let header = entry.header();
let file_path = header.path()?.into_owned();
@@ -443,10 +454,10 @@ pub fn import_wal_from_tar<Reader: Read>(
Ok(())
}
fn import_file<Reader: Read>(
modification: &mut DatadirModification,
async fn import_file(
modification: &mut DatadirModification<'_>,
file_path: &Path,
reader: Reader,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
) -> Result<Option<ControlFileData>> {
let file_name = match file_path.file_name() {
@@ -466,7 +477,7 @@ fn import_file<Reader: Read>(
match file_name.as_ref() {
"pg_control" => {
let bytes = read_all_bytes(reader)?;
let bytes = read_all_bytes(reader).await?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&bytes[..])?;
@@ -479,7 +490,7 @@ fn import_file<Reader: Read>(
return Ok(Some(pg_control));
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
@@ -487,7 +498,7 @@ fn import_file<Reader: Read>(
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
import_rel(modification, file_path, spcnode, dbnode, reader, len).await?;
debug!("imported rel creation");
}
}
@@ -502,7 +513,7 @@ fn import_file<Reader: Read>(
match file_name.as_ref() {
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
@@ -510,36 +521,36 @@ fn import_file<Reader: Read>(
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
import_rel(modification, file_path, spcnode, dbnode, reader, len).await?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
import_slru(modification, slru, file_path, reader, len)?;
import_slru(modification, slru, file_path, reader, len).await?;
debug!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
import_slru(modification, slru, file_path, reader, len)?;
import_slru(modification, slru, file_path, reader, len).await?;
debug!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
import_slru(modification, slru, file_path, reader, len)?;
import_slru(modification, slru, file_path, reader, len).await?;
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
let bytes = read_all_bytes(reader)?;
let bytes = read_all_bytes(reader).await?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
let bytes = read_all_bytes(reader).await?;
// zenith.signal format is "PREV LSN: prev_lsn"
// TODO write serialization and deserialization in the same place.
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
@@ -576,8 +587,8 @@ fn import_file<Reader: Read>(
Ok(None)
}
fn read_all_bytes<Reader: Read>(mut reader: Reader) -> Result<Bytes> {
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf)?;
reader.read_to_end(&mut buf).await?;
Ok(Bytes::copy_from_slice(&buf[..]))
}

View File

@@ -26,9 +26,6 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
@@ -395,9 +392,7 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_stream = copyin_stream(pgb);
pin!(copyin_stream);
let mut copyin_stream = Box::pin(copyin_stream(pgb));
timeline
.import_basebackup_from_tar(&mut copyin_stream, base_lsn)
.await?;
@@ -443,8 +438,8 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_stream = Box::pin(copyin_stream(pgb));
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| import_wal_from_tar(&timeline, reader, start_lsn, end_lsn))?;
let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream);
import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn).await?;
info!("wal import complete");
// Drain the rest of the Copy data
@@ -649,16 +644,14 @@ impl PageServerHandler {
pgb.flush().await?;
/* Send a tarball of the latest layer on the timeline */
let mut writer = CopyDataSink {
pgb,
rt: tokio::runtime::Handle::current(),
};
tokio::task::block_in_place(|| {
{
let mut writer = pgb.copyout_writer();
let basebackup =
basebackup::Basebackup::new(&mut writer, &timeline, lsn, prev_lsn, full_backup)?;
tracing::Span::current().record("lsn", basebackup.lsn.to_string().as_str());
basebackup.send_tarball()
})?;
basebackup.send_tarball().await?;
}
pgb.write_message(&BeMessage::CopyDone)?;
pgb.flush().await?;
info!("basebackup complete");
@@ -966,32 +959,3 @@ async fn get_active_timeline_with_timeout(
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
}
///
/// A std::io::Write implementation that wraps all data written to it in CopyData
/// messages.
///
struct CopyDataSink<'a> {
pgb: &'a mut PostgresBackend,
rt: tokio::runtime::Handle,
}
impl<'a> io::Write for CopyDataSink<'a> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
// CopyData
// FIXME: if the input is large, we should split it into multiple messages.
// Not sure what the threshold should be, but the ultimate hard limit is that
// the length cannot exceed u32.
// FIXME: flush isn't really required, but makes it easier
// to view in wireshark
self.pgb.write_message(&BeMessage::CopyData(data))?;
self.rt.block_on(self.pgb.flush())?;
trace!("CopyData sent for {} bytes!", data.len());
Ok(data.len())
}
fn flush(&mut self) -> io::Result<()> {
// no-op
Ok(())
}
}

View File

@@ -18,8 +18,6 @@ use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use tokio::sync::watch;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -36,7 +34,6 @@ use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
@@ -236,21 +233,15 @@ impl UninitializedTimeline<'_> {
/// Prepares timeline data by loading it from the basebackup archive.
pub async fn import_basebackup_from_tar(
self,
mut copyin_stream: &mut Pin<&mut impl Stream<Item = io::Result<Bytes>>>,
copyin_stream: &mut (impl Stream<Item = io::Result<Bytes>> + Sync + Send + Unpin),
base_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
// import_basebackup_from_tar() is not async, mainly because the Tar crate
// it uses is not async. So we need to jump through some hoops:
// - convert the input from client connection to a synchronous Read
// - use block_in_place()
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| {
import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn)
.context("Failed to import basebackup")
})?;
let mut reader = tokio_util::io::StreamReader::new(copyin_stream);
import_datadir::import_basebackup_from_tar(raw_timeline, &mut reader, base_lsn)
.await
.context("Failed to import basebackup")?;
// Flush loop needs to be spawned in order to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
@@ -2139,13 +2130,12 @@ impl Tenant {
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
tokio::task::block_in_place(|| {
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
})
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
.await
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;

0
test_runner/regress/test_config.py Normal file → Executable file
View File