mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Handle gRPC basebackups in compute_ctl
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1424,6 +1424,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"p256 0.13.2",
|
||||
"pageserver_page_api",
|
||||
"postgres",
|
||||
"postgres_initdb",
|
||||
"regex",
|
||||
@@ -1442,6 +1443,7 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tower 0.5.2",
|
||||
"tower-http",
|
||||
"tower-otel",
|
||||
|
||||
@@ -201,7 +201,7 @@ tokio-postgres-rustls = "0.12.0"
|
||||
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
|
||||
@@ -38,6 +38,7 @@ once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
opentelemetry_sdk.workspace = true
|
||||
p256 = { version = "0.13", features = ["pem"] }
|
||||
pageserver_page_api.workspace = true
|
||||
postgres.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
@@ -53,6 +54,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tonic.workspace = true
|
||||
tower-otel.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{
|
||||
@@ -15,6 +15,7 @@ use itertools::Itertools;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_page_api as page_api;
|
||||
use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
@@ -29,7 +30,9 @@ use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::spawn;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -369,7 +372,7 @@ impl ComputeNode {
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
@@ -941,6 +944,77 @@ impl ComputeNode {
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
|
||||
match Url::parse(shard0_connstr)?.scheme() {
|
||||
"postgres" | "postgresql" => self.try_get_basebackup_libpq(spec, lsn),
|
||||
"grpc" => self.try_get_basebackup_grpc(spec, lsn),
|
||||
scheme => return Err(anyhow!("unknown URL scheme {scheme}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let shard0_connstr = spec
|
||||
.pageserver_connstr
|
||||
.split(',')
|
||||
.next()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let chunks = tokio::runtime::Handle::current().block_on(async move {
|
||||
let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?;
|
||||
|
||||
let req = page_api::proto::GetBaseBackupRequest {
|
||||
read_lsn: Some(page_api::proto::ReadLsn {
|
||||
request_lsn: lsn.0,
|
||||
not_modified_since_lsn: 0,
|
||||
}),
|
||||
replica: false, // TODO: handle replicas, with LSN 0
|
||||
};
|
||||
let mut req = tonic::Request::new(req);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", spec.tenant_id.to_string().parse()?);
|
||||
metadata.insert("neon-timeline-id", spec.timeline_id.to_string().parse()?);
|
||||
metadata.insert("neon-shard-id", "0000".to_string().parse()?); // TODO: shard count
|
||||
if let Some(auth) = spec.storage_auth_token.as_ref() {
|
||||
metadata.insert("authorization", format!("Bearer {auth}").parse()?);
|
||||
}
|
||||
|
||||
let chunks = client.get_base_backup(req).await?.into_inner();
|
||||
anyhow::Ok(chunks)
|
||||
})?;
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
// Convert the chunks stream into an AsyncRead
|
||||
let stream_reader = StreamReader::new(
|
||||
chunks.map(|chunk| chunk.map(|c| c.chunk).map_err(std::io::Error::other)),
|
||||
);
|
||||
|
||||
// Wrap the AsyncRead into a blocking reader for compatibility with tar::Archive
|
||||
let reader = tokio_util::io::SyncIoBridge::new(stream_reader);
|
||||
let mut measured_reader = MeasuredReader::new(reader);
|
||||
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
|
||||
|
||||
// Read the archive directly from the `CopyOutReader`
|
||||
//
|
||||
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
||||
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
||||
// sends an Error after finishing the tarball, we will not notice it.
|
||||
let mut ar = tar::Archive::new(&mut bufreader);
|
||||
ar.set_ignore_zeros(true);
|
||||
ar.unpack(&self.params.pgdata)?;
|
||||
|
||||
// Report metrics
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
|
||||
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
|
||||
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
@@ -956,12 +1030,10 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
config.application_name("compute_ctl");
|
||||
if let Some(spec) = &compute_state.pspec {
|
||||
config.options(&format!(
|
||||
"-c neon.compute_mode={}",
|
||||
spec.spec.mode.to_type_str()
|
||||
));
|
||||
}
|
||||
config.options(&format!(
|
||||
"-c neon.compute_mode={}",
|
||||
spec.spec.mode.to_type_str()
|
||||
));
|
||||
|
||||
// Connect to pageserver
|
||||
let mut client = config.connect(NoTls)?;
|
||||
@@ -1035,10 +1107,7 @@ impl ComputeNode {
|
||||
return result;
|
||||
}
|
||||
Err(ref e) if attempts < max_attempts => {
|
||||
warn!(
|
||||
"Failed to get basebackup: {} (attempt {}/{})",
|
||||
e, attempts, max_attempts
|
||||
);
|
||||
warn!("Failed to get basebackup: {e:?} (attempt {attempts}/{max_attempts})");
|
||||
std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64));
|
||||
retry_period_ms *= 1.5;
|
||||
}
|
||||
@@ -1916,7 +1985,7 @@ LIMIT 100",
|
||||
self.params
|
||||
.remote_ext_base_url
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
.ok_or(DownloadError::BadInput(anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
)))?;
|
||||
|
||||
@@ -2112,7 +2181,7 @@ LIMIT 100",
|
||||
let remote_extensions = spec
|
||||
.remote_extensions
|
||||
.as_ref()
|
||||
.ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
|
||||
.ok_or(anyhow!("Remote extensions are not configured"))?;
|
||||
|
||||
info!("parse shared_preload_libraries from spec.cluster.settings");
|
||||
let mut libs_vec = Vec::new();
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
|
||||
//! compute_ctl can fetch it by calling the control plane's API.
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use regex::Regex;
|
||||
@@ -319,6 +320,12 @@ impl ComputeMode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ComputeMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(self.to_type_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
|
||||
@@ -195,11 +195,25 @@ impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
// Allow 0 read_lsn for base backups.
|
||||
// TODO: reconsider requiring request_lsn > 0.
|
||||
let zero = proto::ReadLsn {
|
||||
request_lsn: 0,
|
||||
not_modified_since_lsn: 0,
|
||||
};
|
||||
let read_lsn = if pb.read_lsn == Some(zero) || pb.read_lsn.is_none() {
|
||||
ReadLsn {
|
||||
request_lsn: Lsn(0),
|
||||
not_modified_since_lsn: None,
|
||||
}
|
||||
} else {
|
||||
pb.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
.try_into()?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
read_lsn,
|
||||
replica: pb.replica,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::{io, str};
|
||||
|
||||
use anyhow::{Context as _, anyhow, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use bytes::{Buf, BufMut as _, BytesMut};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
use itertools::Itertools;
|
||||
@@ -3610,20 +3610,24 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
span_record!(lsn=%req.read_lsn);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
timeline
|
||||
.wait_lsn(
|
||||
req.read_lsn.request_lsn,
|
||||
WaitLsnWaiter::PageService,
|
||||
WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
|
||||
.map_err(|err| {
|
||||
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
|
||||
})?;
|
||||
let mut lsn = None;
|
||||
if req.read_lsn.request_lsn > Lsn(0) {
|
||||
lsn = Some(req.read_lsn.request_lsn);
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
timeline
|
||||
.wait_lsn(
|
||||
req.read_lsn.request_lsn,
|
||||
WaitLsnWaiter::PageService,
|
||||
WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
|
||||
.map_err(|err| {
|
||||
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
|
||||
})?;
|
||||
}
|
||||
|
||||
// Spawn a task to run the basebackup.
|
||||
//
|
||||
@@ -3634,7 +3638,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
let result = basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
Some(req.read_lsn.request_lsn),
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
req.replica,
|
||||
@@ -3650,20 +3654,21 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
// Emit chunks of size CHUNK_SIZE.
|
||||
let chunks = async_stream::try_stream! {
|
||||
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
|
||||
loop {
|
||||
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
|
||||
})?;
|
||||
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
|
||||
let mut n = 1;
|
||||
while n != 0 {
|
||||
n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
|
||||
})?;
|
||||
}
|
||||
let chunk = chunk.into_inner();
|
||||
|
||||
// If we read 0 bytes, either the chunk is full or the stream is closed.
|
||||
if n == 0 {
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
|
||||
chunk.clear();
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk.freeze());
|
||||
}
|
||||
// Wait for the basebackup task to exit and check for errors.
|
||||
jh.await.map_err(|err| {
|
||||
|
||||
Reference in New Issue
Block a user