mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-18 10:00:37 +00:00
Compare commits
5 Commits
http-conn-
...
compute_ct
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a77dd0700c | ||
|
|
c9472434c9 | ||
|
|
f9c2945f74 | ||
|
|
5558457c84 | ||
|
|
26e6ff8ba6 |
@@ -51,6 +51,7 @@ use tracing::{error, info};
|
||||
use url::Url;
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
use compute_tools::compute::{
|
||||
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
|
||||
@@ -68,6 +69,29 @@ use compute_tools::spec::*;
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let (build_tag, clap_args) = init()?;
|
||||
|
||||
let (pg_handle, start_pg_result) =
|
||||
{
|
||||
// Enter startup tracing context
|
||||
let _startup_context_guard = startup_context_from_env();
|
||||
|
||||
let cli_result = process_cli(&clap_args)?;
|
||||
|
||||
let wait_spec_result = wait_spec(build_tag, cli_result)?;
|
||||
|
||||
start_postgres(&clap_args, wait_spec_result)?
|
||||
|
||||
// Startup is finished, exit the startup tracing context
|
||||
};
|
||||
|
||||
// PostgreSQL is now running, if startup was successful. Wait until it exits.
|
||||
let wait_pg_result = wait_postgres(pg_handle)?;
|
||||
|
||||
cleanup_and_exit(start_pg_result, wait_pg_result)
|
||||
}
|
||||
|
||||
fn init() -> Result<(String, clap::ArgMatches)> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
@@ -82,35 +106,11 @@ fn main() -> Result<()> {
|
||||
.to_string();
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let ext_remote_storage = matches
|
||||
.get_one::<String>("remote-ext-config")
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
.map(|conf| {
|
||||
if conf.starts_with("http") {
|
||||
conf.trim_end_matches('/')
|
||||
} else {
|
||||
"http://pg-ext-s3-gateway"
|
||||
}
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
.expect("http-port is required");
|
||||
let pgdata = matches
|
||||
.get_one::<String>("pgdata")
|
||||
.expect("PGDATA path is required");
|
||||
let connstr = matches
|
||||
.get_one::<String>("connstr")
|
||||
.expect("Postgres connection string is required");
|
||||
let spec_json = matches.get_one::<String>("spec");
|
||||
let spec_path = matches.get_one::<String>("spec-path");
|
||||
Ok((build_tag, cli().get_matches()))
|
||||
}
|
||||
|
||||
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard>
|
||||
{
|
||||
// Extract OpenTelemetry context for the startup actions from the
|
||||
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
|
||||
// tracing context.
|
||||
@@ -147,7 +147,7 @@ fn main() -> Result<()> {
|
||||
if let Ok(val) = std::env::var("TRACESTATE") {
|
||||
startup_tracing_carrier.insert("tracestate".to_string(), val);
|
||||
}
|
||||
let startup_context_guard = if !startup_tracing_carrier.is_empty() {
|
||||
if !startup_tracing_carrier.is_empty() {
|
||||
use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry::sdk::propagation::TraceContextPropagator;
|
||||
let guard = TraceContextPropagator::new()
|
||||
@@ -157,8 +157,42 @@ fn main() -> Result<()> {
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn process_cli(
|
||||
matches: &clap::ArgMatches,
|
||||
) -> Result<ProcessCliResult> {
|
||||
let pgbin_default = "postgres";
|
||||
let pgbin = matches
|
||||
.get_one::<String>("pgbin")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(pgbin_default);
|
||||
|
||||
let ext_remote_storage = matches
|
||||
.get_one::<String>("remote-ext-config")
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
.map(|conf| {
|
||||
if conf.starts_with("http") {
|
||||
conf.trim_end_matches('/')
|
||||
} else {
|
||||
"http://pg-ext-s3-gateway"
|
||||
}
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
.expect("http-port is required");
|
||||
let pgdata = matches
|
||||
.get_one::<String>("pgdata")
|
||||
.expect("PGDATA path is required");
|
||||
let connstr = matches
|
||||
.get_one::<String>("connstr")
|
||||
.expect("Postgres connection string is required");
|
||||
let spec_json = matches.get_one::<String>("spec");
|
||||
let spec_path = matches.get_one::<String>("spec-path");
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
@@ -199,6 +233,45 @@ fn main() -> Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let result = ProcessCliResult {
|
||||
// directly from CLI:
|
||||
connstr,
|
||||
pgdata,
|
||||
pgbin,
|
||||
ext_remote_storage,
|
||||
http_port,
|
||||
// others:
|
||||
spec,
|
||||
live_config_allowed,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
struct ProcessCliResult<'clap> {
|
||||
connstr: &'clap str,
|
||||
pgdata: &'clap str,
|
||||
pgbin: &'clap str,
|
||||
ext_remote_storage: Option<&'clap str>,
|
||||
http_port: u16,
|
||||
|
||||
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
|
||||
spec: Option<ComputeSpec>,
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
fn wait_spec(
|
||||
build_tag: String,
|
||||
ProcessCliResult {
|
||||
connstr,
|
||||
pgdata,
|
||||
pgbin,
|
||||
ext_remote_storage,
|
||||
http_port,
|
||||
spec,
|
||||
live_config_allowed,
|
||||
}: ProcessCliResult,
|
||||
) -> Result<WaitSpecResult> {
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
@@ -237,8 +310,6 @@ fn main() -> Result<()> {
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
@@ -255,6 +326,19 @@ fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(WaitSpecResult { compute, http_port })
|
||||
}
|
||||
|
||||
struct WaitSpecResult {
|
||||
compute: Arc<ComputeNode>,
|
||||
// passed through from ProcessCliResult
|
||||
http_port: u16,
|
||||
}
|
||||
|
||||
fn start_postgres(
|
||||
matches: &clap::ArgMatches,
|
||||
WaitSpecResult { compute, http_port }: WaitSpecResult,
|
||||
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
|
||||
// We got all we need, update the state.
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
|
||||
@@ -281,9 +365,10 @@ fn main() -> Result<()> {
|
||||
let _monitor_handle = launch_monitor(&compute);
|
||||
let _configurator_handle = launch_configurator(&compute);
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute(extension_server_port) {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
@@ -334,7 +419,7 @@ fn main() -> Result<()> {
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
let token = CancellationToken::new();
|
||||
|
||||
let vm_monitor = &rt.as_ref().map(|rt| {
|
||||
let vm_monitor = rt.as_ref().map(|rt| {
|
||||
rt.spawn(vm_monitor::start(
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: cgroup.cloned(),
|
||||
@@ -347,12 +432,43 @@ fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
pg,
|
||||
StartPostgresResult {
|
||||
delay_exit,
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
rt,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
|
||||
|
||||
struct StartPostgresResult {
|
||||
delay_exit: bool,
|
||||
// passed through from WaitSpecResult
|
||||
compute: Arc<ComputeNode>,
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
rt: Option<tokio::runtime::Runtime>,
|
||||
#[cfg(target_os = "linux")]
|
||||
token: tokio_util::sync::CancellationToken,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
fn wait_postgres(
|
||||
pg: Option<PostgresHandle>,
|
||||
) -> Result<WaitPostgresResult> {
|
||||
// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||
// propagate to Postgres and it will be shut down as well.
|
||||
let mut exit_code = None;
|
||||
if let Some((mut pg, logs_handle)) = pg {
|
||||
// Startup is finished, exit the startup tracing span
|
||||
drop(startup_context_guard);
|
||||
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
@@ -367,6 +483,26 @@ fn main() -> Result<()> {
|
||||
exit_code = ecode.code()
|
||||
}
|
||||
|
||||
Ok(WaitPostgresResult { exit_code })
|
||||
}
|
||||
|
||||
struct WaitPostgresResult {
|
||||
exit_code: Option<i32>,
|
||||
}
|
||||
|
||||
fn cleanup_and_exit(
|
||||
StartPostgresResult {
|
||||
mut delay_exit,
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
#[cfg(target_os = "linux")]
|
||||
rt,
|
||||
}: StartPostgresResult,
|
||||
WaitPostgresResult { exit_code }: WaitPostgresResult,
|
||||
) -> Result<()> {
|
||||
// Terminate the vm_monitor so it releases the file watcher on
|
||||
// /sys/fs/cgroup/neon-postgres.
|
||||
// Note: the vm-monitor only runs on linux because it requires cgroups.
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
//! This module is responsible for creation of such tarball
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
@@ -38,6 +38,14 @@ use postgres_ffi::PG_TLI;
|
||||
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BasebackupError {
|
||||
#[error("basebackup pageserver error {0:#}")]
|
||||
Server(#[from] anyhow::Error),
|
||||
#[error("basebackup client error {0:#}")]
|
||||
Client(#[source] io::Error),
|
||||
}
|
||||
|
||||
/// Create basebackup with non-rel data in it.
|
||||
/// Only include relational data if 'full_backup' is true.
|
||||
///
|
||||
@@ -53,7 +61,7 @@ pub async fn send_basebackup_tarball<'a, W>(
|
||||
prev_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
ctx: &'a RequestContext,
|
||||
) -> anyhow::Result<()>
|
||||
) -> Result<(), BasebackupError>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
@@ -92,8 +100,10 @@ where
|
||||
|
||||
// Consolidate the derived and the provided prev_lsn values
|
||||
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
|
||||
if backup_prev != Lsn(0) {
|
||||
ensure!(backup_prev == provided_prev_lsn);
|
||||
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
|
||||
)));
|
||||
}
|
||||
provided_prev_lsn
|
||||
} else {
|
||||
@@ -159,15 +169,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_block(&mut self, key: &Key, block: Bytes) -> anyhow::Result<()> {
|
||||
async fn add_block(&mut self, key: &Key, block: Bytes) -> Result<(), BasebackupError> {
|
||||
let (kind, segno, _) = key_to_slru_block(*key)?;
|
||||
|
||||
match kind {
|
||||
SlruKind::Clog => {
|
||||
ensure!(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8);
|
||||
if !(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8) {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"invalid SlruKind::Clog record: block.len()={}",
|
||||
block.len()
|
||||
)));
|
||||
}
|
||||
}
|
||||
SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
|
||||
ensure!(block.len() == BLCKSZ as usize);
|
||||
if block.len() != BLCKSZ as usize {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"invalid {:?} record: block.len()={}",
|
||||
kind,
|
||||
block.len()
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,12 +215,15 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
async fn flush(&mut self) -> Result<(), BasebackupError> {
|
||||
let nblocks = self.buf.len() / BLCKSZ as usize;
|
||||
let (kind, segno) = self.current_segment.take().unwrap();
|
||||
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
|
||||
let header = new_tar_header(&segname, self.buf.len() as u64)?;
|
||||
self.ar.append(&header, self.buf.as_slice()).await?;
|
||||
self.ar
|
||||
.append(&header, self.buf.as_slice())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
self.total_blocks += nblocks;
|
||||
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
@@ -209,7 +233,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(mut self) -> anyhow::Result<()> {
|
||||
async fn finish(mut self) -> Result<(), BasebackupError> {
|
||||
let res = if self.current_segment.is_none() || self.buf.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -226,7 +250,7 @@ impl<'a, W> Basebackup<'a, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
async fn send_tarball(mut self) -> anyhow::Result<()> {
|
||||
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
|
||||
// TODO include checksum
|
||||
|
||||
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
|
||||
@@ -262,7 +286,8 @@ where
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
|
||||
@@ -271,10 +296,15 @@ where
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
for part in slru_partitions.parts {
|
||||
let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?;
|
||||
let blocks = self
|
||||
.timeline
|
||||
.get_vectored(part, self.lsn, self.ctx)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
|
||||
for (key, block) in blocks {
|
||||
slru_builder.add_block(&key, block?).await?;
|
||||
let block = block.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
slru_builder.add_block(&key, block).await?;
|
||||
}
|
||||
}
|
||||
slru_builder.finish().await?;
|
||||
@@ -282,8 +312,11 @@ where
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
for ((spcnode, dbnode), has_relmap_file) in self
|
||||
.timeline
|
||||
.list_dbdirs(self.lsn, self.ctx)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
@@ -292,7 +325,8 @@ where
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
@@ -315,7 +349,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
for (path, content) in self
|
||||
.timeline
|
||||
.list_aux_files(self.lsn, self.ctx)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?
|
||||
{
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
@@ -346,34 +385,41 @@ where
|
||||
for xid in self
|
||||
.timeline
|
||||
.list_twophase_files(self.lsn, self.ctx)
|
||||
.await?
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?
|
||||
{
|
||||
self.add_twophase_file(xid).await?;
|
||||
}
|
||||
|
||||
fail_point!("basebackup-before-control-file", |_| {
|
||||
bail!("failpoint basebackup-before-control-file")
|
||||
Err(BasebackupError::Server(anyhow!(
|
||||
"failpoint basebackup-before-control-file"
|
||||
)))
|
||||
});
|
||||
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file().await?;
|
||||
self.ar.finish().await?;
|
||||
self.ar.finish().await.map_err(BasebackupError::Client)?;
|
||||
debug!("all tarred up!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
if nblocks == 0 {
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar.append(&header, &mut io::empty()).await?;
|
||||
self.ar
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -388,13 +434,17 @@ where
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
let file_name = dst.to_segfile_name(seg as u32);
|
||||
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
|
||||
self.ar.append(&header, segment_data.as_slice()).await?;
|
||||
self.ar
|
||||
.append(&header, segment_data.as_slice())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
seg += 1;
|
||||
startblk = endblk;
|
||||
@@ -414,20 +464,22 @@ where
|
||||
spcnode: u32,
|
||||
dbnode: u32,
|
||||
has_relmap_file: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), BasebackupError> {
|
||||
let relmap_img = if has_relmap_file {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
|
||||
ensure!(
|
||||
img.len()
|
||||
== dispatch_pgversion!(
|
||||
self.timeline.pg_version,
|
||||
pgv::bindings::SIZEOF_RELMAPFILE
|
||||
)
|
||||
);
|
||||
if img.len()
|
||||
!= dispatch_pgversion!(self.timeline.pg_version, pgv::bindings::SIZEOF_RELMAPFILE)
|
||||
{
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"img.len() != SIZE_OF_RELMAPFILE, img.len()={}",
|
||||
img.len(),
|
||||
)));
|
||||
}
|
||||
|
||||
Some(img)
|
||||
} else {
|
||||
@@ -440,14 +492,20 @@ where
|
||||
ver => format!("{ver}\x0A"),
|
||||
};
|
||||
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
|
||||
self.ar.append(&header, pg_version_str.as_bytes()).await?;
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
info!("timeline.pg_version {}", self.timeline.pg_version);
|
||||
|
||||
if let Some(img) = relmap_img {
|
||||
// filenode map for global tablespace
|
||||
let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
|
||||
self.ar.append(&header, &img[..]).await?;
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
} else {
|
||||
warn!("global/pg_filenode.map is missing");
|
||||
}
|
||||
@@ -466,18 +524,26 @@ where
|
||||
&& self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?
|
||||
.is_empty()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
// User defined tablespaces are not supported
|
||||
ensure!(spcnode == DEFAULTTABLESPACE_OID);
|
||||
if spcnode != DEFAULTTABLESPACE_OID {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"spcnode != DEFAULTTABLESPACE_OID, spcnode={spcnode}"
|
||||
)));
|
||||
}
|
||||
|
||||
// Append dir path for each database
|
||||
let path = format!("base/{}", dbnode);
|
||||
let header = new_tar_header_dir(&path)?;
|
||||
self.ar.append(&header, &mut io::empty()).await?;
|
||||
self.ar
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
if let Some(img) = relmap_img {
|
||||
let dst_path = format!("base/{}/PG_VERSION", dbnode);
|
||||
@@ -487,11 +553,17 @@ where
|
||||
ver => format!("{ver}\x0A"),
|
||||
};
|
||||
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
|
||||
self.ar.append(&header, pg_version_str.as_bytes()).await?;
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
|
||||
let header = new_tar_header(&relmap_path, img.len() as u64)?;
|
||||
self.ar.append(&header, &img[..]).await?;
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
@@ -500,11 +572,12 @@ where
|
||||
//
|
||||
// Extract twophase state files
|
||||
//
|
||||
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
|
||||
async fn add_twophase_file(&mut self, xid: TransactionId) -> Result<(), BasebackupError> {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_twophase_file(xid, self.lsn, self.ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(&img[..]);
|
||||
@@ -512,7 +585,10 @@ where
|
||||
buf.put_u32_le(crc);
|
||||
let path = format!("pg_twophase/{:>08X}", xid);
|
||||
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||
self.ar.append(&header, &buf[..]).await?;
|
||||
self.ar
|
||||
.append(&header, &buf[..])
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -521,24 +597,28 @@ where
|
||||
// Add generated pg_control file and bootstrap WAL segment.
|
||||
// Also send zenith.signal file with extra bootstrap data.
|
||||
//
|
||||
async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
if self.lsn == self.timeline.get_ancestor_lsn() {
|
||||
write!(zenith_signal, "PREV LSN: none")?;
|
||||
write!(zenith_signal, "PREV LSN: none")
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
} else {
|
||||
write!(zenith_signal, "PREV LSN: invalid")?;
|
||||
write!(zenith_signal, "PREV LSN: invalid")
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
}
|
||||
} else {
|
||||
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
|
||||
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
}
|
||||
self.ar
|
||||
.append(
|
||||
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
|
||||
zenith_signal.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
@@ -560,7 +640,10 @@ where
|
||||
|
||||
//send pg_control
|
||||
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
|
||||
self.ar.append(&header, &pg_control_bytes[..]).await?;
|
||||
self.ar
|
||||
.append(&header, &pg_control_bytes[..])
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
//send wal segment
|
||||
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
@@ -575,8 +658,16 @@ where
|
||||
self.lsn,
|
||||
)
|
||||
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
|
||||
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
|
||||
self.ar.append(&header, &wal_seg[..]).await?;
|
||||
if wal_seg.len() != WAL_SEGMENT_SIZE {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
|
||||
wal_seg.len()
|
||||
)));
|
||||
}
|
||||
self.ar
|
||||
.append(&header, &wal_seg[..])
|
||||
.await
|
||||
.map_err(BasebackupError::Client)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ use utils::{
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
@@ -1236,6 +1237,13 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
fn map_basebackup_error(err: BasebackupError) -> QueryError {
|
||||
match err {
|
||||
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
|
||||
BasebackupError::Server(e) => QueryError::Other(e),
|
||||
}
|
||||
}
|
||||
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
@@ -1261,7 +1269,8 @@ impl PageServerHandler {
|
||||
let lsn_awaited_after = started.elapsed();
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)
|
||||
.map_err(QueryError::Disconnected)?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
@@ -1276,7 +1285,8 @@ impl PageServerHandler {
|
||||
full_backup,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(map_basebackup_error)?;
|
||||
} else {
|
||||
let mut writer = pgb.copyout_writer();
|
||||
if gzip {
|
||||
@@ -1297,9 +1307,13 @@ impl PageServerHandler {
|
||||
full_backup,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(map_basebackup_error)?;
|
||||
// shutdown the encoder to ensure the gzip footer is written
|
||||
encoder.shutdown().await?;
|
||||
encoder
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
|
||||
} else {
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut writer,
|
||||
@@ -1309,11 +1323,13 @@ impl PageServerHandler {
|
||||
full_backup,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(map_basebackup_error)?;
|
||||
}
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)
|
||||
.map_err(QueryError::Disconnected)?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
|
||||
let basebackup_after = started
|
||||
|
||||
@@ -401,8 +401,8 @@ impl Layer {
|
||||
&self.0.path
|
||||
}
|
||||
|
||||
pub(crate) fn local_path_str(&self) -> &Arc<str> {
|
||||
&self.0.path_str
|
||||
pub(crate) fn debug_str(&self) -> &Arc<str> {
|
||||
&self.0.debug_str
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
@@ -527,8 +527,8 @@ struct LayerInner {
|
||||
/// Full path to the file; unclear if this should exist anymore.
|
||||
path: Utf8PathBuf,
|
||||
|
||||
/// String representation of the full path, used for traversal id.
|
||||
path_str: Arc<str>,
|
||||
/// String representation of the layer, used for traversal id.
|
||||
debug_str: Arc<str>,
|
||||
|
||||
desc: PersistentLayerDesc,
|
||||
|
||||
@@ -735,7 +735,7 @@ impl LayerInner {
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path_str: path.to_string().into(),
|
||||
debug_str: { format!("timelines/{}/{}", timeline.timeline_id, desc.filename()).into() },
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
|
||||
@@ -2948,7 +2948,7 @@ trait TraversalLayerExt {
|
||||
|
||||
impl TraversalLayerExt for Layer {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
Arc::clone(self.local_path_str())
|
||||
Arc::clone(self.debug_str())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user