From 7997fc2932465b1c8854a64c2c053041eacdf80a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 25 May 2022 18:14:44 +0300 Subject: [PATCH] Fix error handling with 'basebackup' command. If the 'basebackup' command failed in the middle of building the tar archive, the client would not report the error, but would attempt to to start up postgres with the partial contents of the data directory. That fails because the control file is missing (it's added to the archive last, precisly to make sure that you cannot start postgres from a partial archive). But the client doesn't see the proper error message that caused the basebackup to fail in the server, which is confusing. Two issues conspired to cause that: 1. The tar::Builder object that we use in the pageserver to construct the tar stream has a Drop handler that automatically writes a valid end-of-archive marker on drop. Because of that, the resulting tarball looks complete, even if an error happens while we're building it. The pageserver does send an ErrorResponse after the seemingly-valid tarball, but: 2. The client stops reading the Copy stream, as soon as it sees the tar end-of-archive marker. Therefore, it doesn't read the ErrorResponse that comes after it. We have two clients that call 'basebackup', one in `control_plane` used by the `neon_local` binary, and another one in `compute_tools`. Both had the same issue. This PR fixes both issues, even though fixing either one would be enough to fix the problem at hand. The pageserver now doesn't send the end-of-archive marker on error, and the client now reads the copy stream to the end, even if it sees an end-of-archive marker. Fixes github issue #1715 In the passing, change Basebackup to use generic Write rather than 'dyn'. --- compute_tools/src/compute.rs | 8 +- control_plane/Cargo.toml | 2 +- control_plane/src/compute.rs | 9 +- pageserver/src/basebackup.rs | 90 +++++++++++++++++-- pageserver/src/page_service.rs | 3 +- .../batch_others/test_basebackup_error.py | 20 +++++ 6 files changed, 119 insertions(+), 13 deletions(-) create mode 100644 test_runner/batch_others/test_basebackup_error.py diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a8422fb2b2..fd60b80305 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -146,8 +146,14 @@ impl ComputeNode { _ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; - let mut ar = tar::Archive::new(copyreader); + // Read the archive directly from the `CopyOutReader` + // + // Set `ignore_zeros` so that unpack() reads all the Copy data and + // doesn't stop at the end-of-archive marker. Otherwise, if the server + // sends an Error after finishing the tarball, we will not notice it. + let mut ar = tar::Archive::new(copyreader); + ar.set_ignore_zeros(true); ar.unpack(&self.pgdata)?; self.metrics.basebackup_ms.store( diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 41417aab9a..21311eea9a 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -tar = "0.4.33" +tar = "0.4.38" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } serde = { version = "1.0", features = ["derive"] } serde_with = "1.12.0" diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 350cf74b7c..045acd7519 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -231,8 +231,13 @@ impl PostgresNode { .context("page server 'basebackup' command failed")?; // Read the archive directly from the `CopyOutReader` - tar::Archive::new(copyreader) - .unpack(&self.pgdata()) + // + // Set `ignore_zeros` so that unpack() reads all the Copy data and + // doesn't stop at the end-of-archive marker. Otherwise, if the server + // sends an Error after finishing the tarball, we will not notice it. + let mut ar = tar::Archive::new(copyreader); + ar.set_ignore_zeros(true); + ar.unpack(&self.pgdata()) .context("extracting base backup failed")?; Ok(()) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 92d35130d8..46d824b2e2 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -10,8 +10,9 @@ //! This module is responsible for creation of such tarball //! from data stored in object storage. //! -use anyhow::{anyhow, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{BufMut, BytesMut}; +use fail::fail_point; use std::fmt::Write as FmtWrite; use std::io; use std::io::Write; @@ -30,11 +31,16 @@ use utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, /// created mostly to avoid passing a lot of parameters between various functions /// used for constructing tarball. -pub struct Basebackup<'a> { - ar: Builder<&'a mut dyn Write>, +pub struct Basebackup<'a, W> +where + W: Write, +{ + ar: Builder>, timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, + + finished: bool, } // Create basebackup with non-rel data in it. Omit relational data. @@ -44,12 +50,15 @@ pub struct Basebackup<'a> { // * When working without safekeepers. In this situation it is important to match the lsn // we are taking basebackup on with the lsn that is used in pageserver's walreceiver // to start the replication. -impl<'a> Basebackup<'a> { +impl<'a, W> Basebackup<'a, W> +where + W: Write, +{ pub fn new( - write: &'a mut dyn Write, + write: W, timeline: &'a Arc, req_lsn: Option, - ) -> Result> { + ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the @@ -90,14 +99,15 @@ impl<'a> Basebackup<'a> { ); Ok(Basebackup { - ar: Builder::new(write), + ar: Builder::new(AbortableWrite::new(write)), timeline, lsn: backup_lsn, prev_record_lsn: backup_prev, + finished: false, }) } - pub fn send_tarball(&mut self) -> anyhow::Result<()> { + pub fn send_tarball(mut self) -> anyhow::Result<()> { // Create pgdata subdirs structure for dir in pg_constants::PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(*dir)?; @@ -135,9 +145,14 @@ impl<'a> Basebackup<'a> { self.add_twophase_file(xid)?; } + fail_point!("basebackup-before-control-file", |_| { + bail!("failpoint basebackup-before-control-file") + }); + // Generate pg_control and bootstrap WAL segment. self.add_pgcontrol_file()?; self.ar.finish()?; + self.finished = true; debug!("all tarred up!"); Ok(()) } @@ -331,6 +346,19 @@ impl<'a> Basebackup<'a> { } } +impl<'a, W> Drop for Basebackup<'a, W> +where + W: Write, +{ + /// If the basebackup was not finished, prevent the Archive::drop() from + /// writing the end-of-archive marker. + fn drop(&mut self) { + if !self.finished { + self.ar.get_mut().abort(); + } + } +} + // // Create new tarball entry header // @@ -366,3 +394,49 @@ fn new_tar_header_dir(path: &str) -> anyhow::Result
{ header.set_cksum(); Ok(header) } + +/// A wrapper that passes through all data to the underlying Write, +/// until abort() is called. +/// +/// tar::Builder has an annoying habit of finishing the archive with +/// a valid tar end-of-archive marker (two 512-byte sectors of zeros), +/// even if an error occurs and we don't finish building the archive. +/// We'd rather abort writing the tarball immediately than construct +/// a seemingly valid but incomplete archive. This wrapper allows us +/// to swallow the end-of-archive marker that Builder::drop() emits, +/// without writing it to the underlying sink. +/// +struct AbortableWrite { + w: W, + aborted: bool, +} + +impl AbortableWrite { + pub fn new(w: W) -> Self { + AbortableWrite { w, aborted: false } + } + + pub fn abort(&mut self) { + self.aborted = true; + } +} + +impl Write for AbortableWrite +where + W: Write, +{ + fn write(&mut self, data: &[u8]) -> io::Result { + if self.aborted { + Ok(data.len()) + } else { + self.w.write(data) + } + } + fn flush(&mut self) -> io::Result<()> { + if self.aborted { + Ok(()) + } else { + self.w.flush() + } + } +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 03264c9782..f54cd550b3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -593,7 +593,8 @@ impl PageServerHandler { /* Send a tarball of the latest layer on the timeline */ { let mut writer = CopyDataSink { pgb }; - let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; + + let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; span.record("lsn", &basebackup.lsn.to_string().as_str()); basebackup.send_tarball()?; } diff --git a/test_runner/batch_others/test_basebackup_error.py b/test_runner/batch_others/test_basebackup_error.py new file mode 100644 index 0000000000..4b8b8a746c --- /dev/null +++ b/test_runner/batch_others/test_basebackup_error.py @@ -0,0 +1,20 @@ +import pytest +from contextlib import closing + +from fixtures.zenith_fixtures import ZenithEnv +from fixtures.log_helper import log + + +# +# Test error handling, if the 'basebackup' command fails in the middle +# of building the tar archive. +# +def test_basebackup_error(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli.create_branch("test_basebackup_error", "empty") + + # Introduce failpoint + env.pageserver.safe_psql(f"failpoints basebackup-before-control-file=return") + + with pytest.raises(Exception, match="basebackup-before-control-file"): + pg = env.postgres.create_start('test_basebackup_error')