Compare commits

...

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
a77dd0700c Move startup tracing context handling 2024-05-03 09:28:19 +03:00
Em Sharnoff
c9472434c9 compute_ctl: Break up main() into discrete phases
This commit is intentionally designed to have as small a diff as
possible. To that end, the basic idea is that each distinct "chunk" of
the previous main() has been wrapped in its own function, with the
return values from each function being passed directly into the next.

The structure of main() is now visible from its contents:

  1. init()
  2. process_cli()
  3. wait_spec()
  4. start_postgres()
  5. wait_postgres()
  6. cleanup_and_exit()

There's a lot of other work that can / should(?) be done beyond this,
but I figure that's more opinionated, and this should be a solid start.
2024-05-01 12:07:46 -07:00
Em Sharnoff
f9c2945f74 compute_ctl: Non-functional prep changes to reduce diff
A couple lines moved further down in main(), and one case of using
Option<&str> instead of Option<&String>.
2024-05-01 12:06:55 -07:00
Alex Chi Z
5558457c84 chore(pageserver): categorize basebackup errors (#7523)
close https://github.com/neondatabase/neon/issues/7391

## Summary of changes

Categorize basebackup error into two types: server error and client
error. This makes it easier to set up alerts.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-01 16:31:59 +00:00
Alex Chi Z
26e6ff8ba6 chore(pageserver): concise error message for layer traversal (#7565)
Instead of showing the full path of layer traversal, we now only show
tenant (in tracing context)+timeline+filename.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-01 11:44:42 -04:00
5 changed files with 345 additions and 102 deletions

View File

@@ -51,6 +51,7 @@ use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
@@ -68,6 +69,29 @@ use compute_tools::spec::*;
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
let (build_tag, clap_args) = init()?;
let (pg_handle, start_pg_result) =
{
// Enter startup tracing context
let _startup_context_guard = startup_context_from_env();
let cli_result = process_cli(&clap_args)?;
let wait_spec_result = wait_spec(build_tag, cli_result)?;
start_postgres(&clap_args, wait_spec_result)?
// Startup is finished, exit the startup tracing context
};
// PostgreSQL is now running, if startup was successful. Wait until it exits.
let wait_pg_result = wait_postgres(pg_handle)?;
cleanup_and_exit(start_pg_result, wait_pg_result)
}
fn init() -> Result<(String, clap::ArgMatches)> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
@@ -82,35 +106,11 @@ fn main() -> Result<()> {
.to_string();
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
let pgbin_default = String::from("postgres");
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
.map(|conf| {
if conf.starts_with("http") {
conf.trim_end_matches('/')
} else {
"http://pg-ext-s3-gateway"
}
});
let http_port = *matches
.get_one::<u16>("http-port")
.expect("http-port is required");
let pgdata = matches
.get_one::<String>("pgdata")
.expect("PGDATA path is required");
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
Ok((build_tag, cli().get_matches()))
}
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard>
{
// Extract OpenTelemetry context for the startup actions from the
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
// tracing context.
@@ -147,7 +147,7 @@ fn main() -> Result<()> {
if let Ok(val) = std::env::var("TRACESTATE") {
startup_tracing_carrier.insert("tracestate".to_string(), val);
}
let startup_context_guard = if !startup_tracing_carrier.is_empty() {
if !startup_tracing_carrier.is_empty() {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
let guard = TraceContextPropagator::new()
@@ -157,8 +157,42 @@ fn main() -> Result<()> {
Some(guard)
} else {
None
};
}
}
fn process_cli(
matches: &clap::ArgMatches,
) -> Result<ProcessCliResult> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
.map(|s| s.as_str())
.unwrap_or(pgbin_default);
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
.map(|conf| {
if conf.starts_with("http") {
conf.trim_end_matches('/')
} else {
"http://pg-ext-s3-gateway"
}
});
let http_port = *matches
.get_one::<u16>("http-port")
.expect("http-port is required");
let pgdata = matches
.get_one::<String>("pgdata")
.expect("PGDATA path is required");
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
@@ -199,6 +233,45 @@ fn main() -> Result<()> {
}
};
let result = ProcessCliResult {
// directly from CLI:
connstr,
pgdata,
pgbin,
ext_remote_storage,
http_port,
// others:
spec,
live_config_allowed,
};
Ok(result)
}
struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
spec: Option<ComputeSpec>,
live_config_allowed: bool,
}
fn wait_spec(
build_tag: String,
ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
http_port,
spec,
live_config_allowed,
}: ProcessCliResult,
) -> Result<WaitSpecResult> {
let mut new_state = ComputeState::new();
let spec_set;
@@ -237,8 +310,6 @@ fn main() -> Result<()> {
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
let extension_server_port: u16 = http_port;
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -255,6 +326,19 @@ fn main() -> Result<()> {
}
}
Ok(WaitSpecResult { compute, http_port })
}
struct WaitSpecResult {
compute: Arc<ComputeNode>,
// passed through from ProcessCliResult
http_port: u16,
}
fn start_postgres(
matches: &clap::ArgMatches,
WaitSpecResult { compute, http_port }: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
@@ -281,9 +365,10 @@ fn main() -> Result<()> {
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let extension_server_port: u16 = http_port;
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
let pg = match compute.start_compute(extension_server_port) {
Ok(pg) => Some(pg),
Err(err) => {
@@ -334,7 +419,7 @@ fn main() -> Result<()> {
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
let vm_monitor = &rt.as_ref().map(|rt| {
let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
@@ -347,12 +432,43 @@ fn main() -> Result<()> {
}
}
Ok((
pg,
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
rt,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
))
}
type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
struct StartPostgresResult {
delay_exit: bool,
// passed through from WaitSpecResult
compute: Arc<ComputeNode>,
#[cfg(target_os = "linux")]
rt: Option<tokio::runtime::Runtime>,
#[cfg(target_os = "linux")]
token: tokio_util::sync::CancellationToken,
#[cfg(target_os = "linux")]
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
}
fn wait_postgres(
pg: Option<PostgresHandle>,
) -> Result<WaitPostgresResult> {
// Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well.
let mut exit_code = None;
if let Some((mut pg, logs_handle)) = pg {
// Startup is finished, exit the startup tracing span
drop(startup_context_guard);
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
@@ -367,6 +483,26 @@ fn main() -> Result<()> {
exit_code = ecode.code()
}
Ok(WaitPostgresResult { exit_code })
}
struct WaitPostgresResult {
exit_code: Option<i32>,
}
fn cleanup_and_exit(
StartPostgresResult {
mut delay_exit,
compute,
#[cfg(target_os = "linux")]
vm_monitor,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
rt,
}: StartPostgresResult,
WaitPostgresResult { exit_code }: WaitPostgresResult,
) -> Result<()> {
// Terminate the vm_monitor so it releases the file watcher on
// /sys/fs/cgroup/neon-postgres.
// Note: the vm-monitor only runs on linux because it requires cgroups.

View File

@@ -10,7 +10,7 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{key_to_slru_block, Key};
@@ -38,6 +38,14 @@ use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
#[derive(Debug, thiserror::Error)]
pub enum BasebackupError {
#[error("basebackup pageserver error {0:#}")]
Server(#[from] anyhow::Error),
#[error("basebackup client error {0:#}")]
Client(#[source] io::Error),
}
/// Create basebackup with non-rel data in it.
/// Only include relational data if 'full_backup' is true.
///
@@ -53,7 +61,7 @@ pub async fn send_basebackup_tarball<'a, W>(
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: &'a RequestContext,
) -> anyhow::Result<()>
) -> Result<(), BasebackupError>
where
W: AsyncWrite + Send + Sync + Unpin,
{
@@ -92,8 +100,10 @@ where
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn);
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
return Err(BasebackupError::Server(anyhow!(
"backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
)));
}
provided_prev_lsn
} else {
@@ -159,15 +169,26 @@ where
}
}
async fn add_block(&mut self, key: &Key, block: Bytes) -> anyhow::Result<()> {
async fn add_block(&mut self, key: &Key, block: Bytes) -> Result<(), BasebackupError> {
let (kind, segno, _) = key_to_slru_block(*key)?;
match kind {
SlruKind::Clog => {
ensure!(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8);
if !(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8) {
return Err(BasebackupError::Server(anyhow!(
"invalid SlruKind::Clog record: block.len()={}",
block.len()
)));
}
}
SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
ensure!(block.len() == BLCKSZ as usize);
if block.len() != BLCKSZ as usize {
return Err(BasebackupError::Server(anyhow!(
"invalid {:?} record: block.len()={}",
kind,
block.len()
)));
}
}
}
@@ -194,12 +215,15 @@ where
Ok(())
}
async fn flush(&mut self) -> anyhow::Result<()> {
async fn flush(&mut self) -> Result<(), BasebackupError> {
let nblocks = self.buf.len() / BLCKSZ as usize;
let (kind, segno) = self.current_segment.take().unwrap();
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
let header = new_tar_header(&segname, self.buf.len() as u64)?;
self.ar.append(&header, self.buf.as_slice()).await?;
self.ar
.append(&header, self.buf.as_slice())
.await
.map_err(BasebackupError::Client)?;
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
@@ -209,7 +233,7 @@ where
Ok(())
}
async fn finish(mut self) -> anyhow::Result<()> {
async fn finish(mut self) -> Result<(), BasebackupError> {
let res = if self.current_segment.is_none() || self.buf.is_empty() {
Ok(())
} else {
@@ -226,7 +250,7 @@ impl<'a, W> Basebackup<'a, W>
where
W: AsyncWrite + Send + Sync + Unpin,
{
async fn send_tarball(mut self) -> anyhow::Result<()> {
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
// TODO include checksum
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
@@ -262,7 +286,8 @@ where
let slru_partitions = self
.timeline
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.await?
.await
.map_err(|e| BasebackupError::Server(e.into()))?
.partition(
self.timeline.get_shard_identity(),
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
@@ -271,10 +296,15 @@ where
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
for part in slru_partitions.parts {
let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?;
let blocks = self
.timeline
.get_vectored(part, self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
for (key, block) in blocks {
slru_builder.add_block(&key, block?).await?;
let block = block.map_err(|e| BasebackupError::Server(e.into()))?;
slru_builder.add_block(&key, block).await?;
}
}
slru_builder.finish().await?;
@@ -282,8 +312,11 @@ where
let mut min_restart_lsn: Lsn = Lsn::MAX;
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
for ((spcnode, dbnode), has_relmap_file) in self
.timeline
.list_dbdirs(self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
@@ -292,7 +325,8 @@ where
let rels = self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.await?;
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
@@ -315,7 +349,12 @@ where
}
}
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
for (path, content) in self
.timeline
.list_aux_files(self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
@@ -346,34 +385,41 @@ where
for xid in self
.timeline
.list_twophase_files(self.lsn, self.ctx)
.await?
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
self.add_twophase_file(xid).await?;
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
Err(BasebackupError::Server(anyhow!(
"failpoint basebackup-before-control-file"
)))
});
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
self.ar.finish().await.map_err(BasebackupError::Client)?;
debug!("all tarred up!");
Ok(())
}
/// Add contents of relfilenode `src`, naming it as `dst`.
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.await?;
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
// If the relation is empty, create an empty file
if nblocks == 0 {
let file_name = dst.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
self.ar
.append(&header, &mut io::empty())
.await
.map_err(BasebackupError::Client)?;
return Ok(());
}
@@ -388,13 +434,17 @@ where
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
.await?;
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
segment_data.extend_from_slice(&img[..]);
}
let file_name = dst.to_segfile_name(seg as u32);
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
self.ar.append(&header, segment_data.as_slice()).await?;
self.ar
.append(&header, segment_data.as_slice())
.await
.map_err(BasebackupError::Client)?;
seg += 1;
startblk = endblk;
@@ -414,20 +464,22 @@ where
spcnode: u32,
dbnode: u32,
has_relmap_file: bool,
) -> anyhow::Result<()> {
) -> Result<(), BasebackupError> {
let relmap_img = if has_relmap_file {
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.await?;
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
ensure!(
img.len()
== dispatch_pgversion!(
self.timeline.pg_version,
pgv::bindings::SIZEOF_RELMAPFILE
)
);
if img.len()
!= dispatch_pgversion!(self.timeline.pg_version, pgv::bindings::SIZEOF_RELMAPFILE)
{
return Err(BasebackupError::Server(anyhow!(
"img.len() != SIZE_OF_RELMAPFILE, img.len()={}",
img.len(),
)));
}
Some(img)
} else {
@@ -440,14 +492,20 @@ where
ver => format!("{ver}\x0A"),
};
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes()).await?;
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(BasebackupError::Client)?;
info!("timeline.pg_version {}", self.timeline.pg_version);
if let Some(img) = relmap_img {
// filenode map for global tablespace
let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
self.ar.append(&header, &img[..]).await?;
self.ar
.append(&header, &img[..])
.await
.map_err(BasebackupError::Client)?;
} else {
warn!("global/pg_filenode.map is missing");
}
@@ -466,18 +524,26 @@ where
&& self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.await?
.await
.map_err(|e| BasebackupError::Server(e.into()))?
.is_empty()
{
return Ok(());
}
// User defined tablespaces are not supported
ensure!(spcnode == DEFAULTTABLESPACE_OID);
if spcnode != DEFAULTTABLESPACE_OID {
return Err(BasebackupError::Server(anyhow!(
"spcnode != DEFAULTTABLESPACE_OID, spcnode={spcnode}"
)));
}
// Append dir path for each database
let path = format!("base/{}", dbnode);
let header = new_tar_header_dir(&path)?;
self.ar.append(&header, &mut io::empty()).await?;
self.ar
.append(&header, &mut io::empty())
.await
.map_err(BasebackupError::Client)?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
@@ -487,11 +553,17 @@ where
ver => format!("{ver}\x0A"),
};
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes()).await?;
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(BasebackupError::Client)?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar.append(&header, &img[..]).await?;
self.ar
.append(&header, &img[..])
.await
.map_err(BasebackupError::Client)?;
}
};
Ok(())
@@ -500,11 +572,12 @@ where
//
// Extract twophase state files
//
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
async fn add_twophase_file(&mut self, xid: TransactionId) -> Result<(), BasebackupError> {
let img = self
.timeline
.get_twophase_file(xid, self.lsn, self.ctx)
.await?;
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
@@ -512,7 +585,10 @@ where
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..]).await?;
self.ar
.append(&header, &buf[..])
.await
.map_err(BasebackupError::Client)?;
Ok(())
}
@@ -521,24 +597,28 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
write!(zenith_signal, "PREV LSN: none")
.map_err(|e| BasebackupError::Server(e.into()))?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
write!(zenith_signal, "PREV LSN: invalid")
.map_err(|e| BasebackupError::Server(e.into()))?;
}
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
.map_err(|e| BasebackupError::Server(e.into()))?;
}
self.ar
.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)
.await?;
.await
.map_err(BasebackupError::Client)?;
let checkpoint_bytes = self
.timeline
@@ -560,7 +640,10 @@ where
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..]).await?;
self.ar
.append(&header, &pg_control_bytes[..])
.await
.map_err(BasebackupError::Client)?;
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
@@ -575,8 +658,16 @@ where
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..]).await?;
if wal_seg.len() != WAL_SEGMENT_SIZE {
return Err(BasebackupError::Server(anyhow!(
"wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
wal_seg.len()
)));
}
self.ar
.append(&header, &wal_seg[..])
.await
.map_err(BasebackupError::Client)?;
Ok(())
}
}

View File

@@ -48,6 +48,7 @@ use utils::{
use crate::auth::check_permission;
use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
@@ -1236,6 +1237,13 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn map_basebackup_error(err: BasebackupError) -> QueryError {
match err {
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
BasebackupError::Server(e) => QueryError::Other(e),
}
}
let started = std::time::Instant::now();
// check that the timeline exists
@@ -1261,7 +1269,8 @@ impl PageServerHandler {
let lsn_awaited_after = started.elapsed();
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.write_message_noflush(&BeMessage::CopyOutResponse)
.map_err(QueryError::Disconnected)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
// Send a tarball of the latest layer on the timeline. Compress if not
@@ -1276,7 +1285,8 @@ impl PageServerHandler {
full_backup,
ctx,
)
.await?;
.await
.map_err(map_basebackup_error)?;
} else {
let mut writer = pgb.copyout_writer();
if gzip {
@@ -1297,9 +1307,13 @@ impl PageServerHandler {
full_backup,
ctx,
)
.await?;
.await
.map_err(map_basebackup_error)?;
// shutdown the encoder to ensure the gzip footer is written
encoder.shutdown().await?;
encoder
.shutdown()
.await
.map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
} else {
basebackup::send_basebackup_tarball(
&mut writer,
@@ -1309,11 +1323,13 @@ impl PageServerHandler {
full_backup,
ctx,
)
.await?;
.await
.map_err(map_basebackup_error)?;
}
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.write_message_noflush(&BeMessage::CopyDone)
.map_err(QueryError::Disconnected)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
let basebackup_after = started

View File

@@ -401,8 +401,8 @@ impl Layer {
&self.0.path
}
pub(crate) fn local_path_str(&self) -> &Arc<str> {
&self.0.path_str
pub(crate) fn debug_str(&self) -> &Arc<str> {
&self.0.debug_str
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
@@ -527,8 +527,8 @@ struct LayerInner {
/// Full path to the file; unclear if this should exist anymore.
path: Utf8PathBuf,
/// String representation of the full path, used for traversal id.
path_str: Arc<str>,
/// String representation of the layer, used for traversal id.
debug_str: Arc<str>,
desc: PersistentLayerDesc,
@@ -735,7 +735,7 @@ impl LayerInner {
LayerInner {
conf,
path_str: path.to_string().into(),
debug_str: { format!("timelines/{}/{}", timeline.timeline_id, desc.filename()).into() },
path,
desc,
timeline: Arc::downgrade(timeline),

View File

@@ -2948,7 +2948,7 @@ trait TraversalLayerExt {
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
Arc::clone(self.local_path_str())
Arc::clone(self.debug_str())
}
}