compute_tools: support gRPC base backups in compute_ctl (#12244)

## Problem

`compute_ctl` should support gRPC base backups.

Requires #12111.
Requires #12243.
Touches #11926.

## Summary of changes

Support `grpc://` connstrings for `compute_ctl` base backups.
This commit is contained in:
Erik Grinaker
2025-06-27 18:39:00 +02:00
committed by GitHub
parent e33e109403
commit e50b914a8e
11 changed files with 128 additions and 56 deletions

View File

@@ -38,6 +38,7 @@ once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
p256 = { version = "0.13", features = ["pem"] }
pageserver_page_api.workspace = true
postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
@@ -53,6 +54,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true

View File

@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
@@ -15,6 +15,7 @@ use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use pageserver_page_api::{self as page_api, BaseBackupCompression};
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
@@ -35,6 +36,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use utils::pid_file;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
@@ -995,13 +997,87 @@ impl ComputeNode {
Ok(())
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
/// Fetches a basebackup from the Pageserver using the compute state's Pageserver connstring and
/// unarchives it to `pgdata` directory, replacing any existing contents.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Instant::now();
// Detect the protocol scheme. If the URL doesn't have a scheme, assume libpq.
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let scheme = match Url::parse(shard0_connstr) {
Ok(url) => url.scheme().to_lowercase().to_string(),
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
let started = Instant::now();
let (connected, size) = match scheme.as_str() {
"postgresql" | "postgres" => self.try_get_basebackup_libpq(spec, lsn)?,
"grpc" => self.try_get_basebackup_grpc(spec, lsn)?,
scheme => return Err(anyhow!("unknown URL scheme {scheme}")),
};
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros =
connected.duration_since(started).as_micros() as u64;
state.metrics.basebackup_bytes = size as u64;
state.metrics.basebackup_ms = started.elapsed().as_millis() as u64;
Ok(())
}
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec
.pageserver_connstr
.split(',')
.next()
.unwrap()
.to_string();
let shard_index = match spec.pageserver_connstr.split(',').count() as u8 {
0 | 1 => ShardIndex::unsharded(),
count => ShardIndex::new(ShardNumber(0), ShardCount(count)),
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
shard0_connstr,
spec.tenant_id,
spec.timeline_id,
shard_index,
spec.storage_auth_token.clone(),
None, // NB: base backups use payload compression
)
.await?;
let connected = Instant::now();
let reader = client
.get_base_backup(page_api::GetBaseBackupRequest {
lsn: (lsn != Lsn(0)).then_some(lsn),
compression: BaseBackupCompression::Gzip,
replica: spec.spec.mode != ComputeMode::Primary,
full: false,
})
.await?;
anyhow::Ok((reader, connected))
})?;
let mut reader = MeasuredReader::new(tokio_util::io::SyncIoBridge::new(reader));
// Set `ignore_zeros` so that unpack() reads the entire stream and doesn't just stop at the
// end-of-archive marker. If the server errors, the tar::Builder drop handler will write an
// end-of-archive marker before the error is emitted, and we would not see the error.
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut reader));
ar.set_ignore_zeros(true);
ar.unpack(&self.params.pgdata)?;
Ok((connected, reader.get_byte_count()))
}
/// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp
/// when the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
@@ -1015,16 +1091,14 @@ impl ComputeNode {
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let connected = Instant::now();
let basebackup_cmd = match lsn {
Lsn(0) => {
@@ -1061,16 +1135,13 @@ impl ComputeNode {
// Set `ignore_zeros` so that unpack() reads all the Copy data and
// doesn't stop at the end-of-archive marker. Otherwise, if the server
// sends an Error after finishing the tarball, we will not notice it.
// The tar::Builder drop handler will write an end-of-archive marker
// before emitting the error, and we would not see it otherwise.
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
ar.set_ignore_zeros(true);
ar.unpack(&self.params.pgdata)?;
// Report metrics
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
Ok(())
Ok((connected, measured_reader.get_byte_count()))
}
// Gets the basebackup in a retry loop