From e50b914a8eefa35a79a64bdb7715c0c102f94381 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 27 Jun 2025 18:39:00 +0200 Subject: [PATCH] compute_tools: support gRPC base backups in `compute_ctl` (#12244) ## Problem `compute_ctl` should support gRPC base backups. Requires #12111. Requires #12243. Touches #11926. ## Summary of changes Support `grpc://` connstrings for `compute_ctl` base backups. --- Cargo.lock | 6 +- Cargo.toml | 2 +- build-tools.Dockerfile | 1 + compute/compute-node.Dockerfile | 15 ++- compute_tools/Cargo.toml | 2 + compute_tools/src/compute.rs | 105 +++++++++++++++++---- pageserver/page_api/Cargo.toml | 3 +- pageserver/page_api/src/client.rs | 37 +++----- pageserver/page_api/src/model.rs | 4 +- pageserver/pagebench/src/cmd/basebackup.rs | 5 +- workspace_hack/Cargo.toml | 4 +- 11 files changed, 128 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7098711bb4..71e78243a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1316,6 +1316,7 @@ dependencies = [ "opentelemetry", "opentelemetry_sdk", "p256 0.13.2", + "pageserver_page_api", "postgres", "postgres_initdb", "postgres_versioninfo", @@ -1335,6 +1336,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", + "tonic 0.13.1", "tower 0.5.2", "tower-http", "tower-otel", @@ -4475,12 +4477,13 @@ dependencies = [ "bytes", "futures", "pageserver_api", - "postgres_ffi", + "postgres_ffi_types", "prost 0.13.5", "strum", "strum_macros", "thiserror 1.0.69", "tokio", + "tokio-util", "tonic 0.13.1", "tonic-build", "utils", @@ -8679,7 +8682,6 @@ dependencies = [ "num-iter", "num-rational", "num-traits", - "once_cell", "p256 0.13.2", "parquet", "prettyplease", diff --git a/Cargo.toml b/Cargo.toml index 857bc5d5d9..aeb7976b6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,7 @@ tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} tokio-stream = "0.1" tokio-tar = "0.3" -tokio-util = { version = "0.7.10", features = ["io", "rt"] } +tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] } toml = "0.8" toml_edit = "0.22" tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] } diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index b70ced7886..14a52bd736 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -165,6 +165,7 @@ RUN curl -fsSL \ && rm sql_exporter.tar.gz # protobuf-compiler (protoc) +# Keep the version the same as in compute/compute-node.Dockerfile ENV PROTOC_VERSION=25.1 RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \ && unzip -q protoc.zip -d protoc \ diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 35ece73030..bce2a28b8b 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -115,6 +115,9 @@ ARG EXTENSIONS=all FROM $BASE_IMAGE_SHA AS build-deps ARG DEBIAN_VERSION +# Keep in sync with build-tools.Dockerfile +ENV PROTOC_VERSION=25.1 + # Use strict mode for bash to catch errors early SHELL ["/bin/bash", "-euo", "pipefail", "-c"] @@ -149,8 +152,14 @@ RUN case $DEBIAN_VERSION in \ libclang-dev \ jsonnet \ $VERSION_INSTALLS \ - && apt clean && rm -rf /var/lib/apt/lists/* && \ - useradd -ms /bin/bash nonroot -b /home + && apt clean && rm -rf /var/lib/apt/lists/* \ + && useradd -ms /bin/bash nonroot -b /home \ + # Install protoc from binary release, since Debian's versions are too old. + && curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \ + && unzip -q protoc.zip -d protoc \ + && mv protoc/bin/protoc /usr/local/bin/protoc \ + && mv protoc/include/google /usr/local/include/google \ + && rm -rf protoc.zip protoc ######################################################################################### # @@ -1170,7 +1179,7 @@ COPY --from=pgrag-src /ext-src/ /ext-src/ # Install it using virtual environment, because Python 3.11 (the default version on Debian 12 (Bookworm)) complains otherwise WORKDIR /ext-src/onnxruntime-src RUN apt update && apt install --no-install-recommends --no-install-suggests -y \ - python3 python3-pip python3-venv protobuf-compiler && \ + python3 python3-pip python3-venv && \ apt clean && rm -rf /var/lib/apt/lists/* && \ python3 -m venv venv && \ . venv/bin/activate && \ diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index a5879c4b7c..0a071c1ad1 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -38,6 +38,7 @@ once_cell.workspace = true opentelemetry.workspace = true opentelemetry_sdk.workspace = true p256 = { version = "0.13", features = ["pem"] } +pageserver_page_api.workspace = true postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["json"] } @@ -53,6 +54,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true tokio-util.workspace = true tokio-stream.workspace = true +tonic.workspace = true tower-otel.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index cf558ee01a..7566626d57 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use chrono::{DateTime, Utc}; use compute_api::privilege::Privilege; use compute_api::responses::{ @@ -15,6 +15,7 @@ use itertools::Itertools; use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; use once_cell::sync::Lazy; +use pageserver_page_api::{self as page_api, BaseBackupCompression}; use postgres; use postgres::NoTls; use postgres::error::SqlState; @@ -35,6 +36,7 @@ use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::measured_stream::MeasuredReader; use utils::pid_file; +use utils::shard::{ShardCount, ShardIndex, ShardNumber}; use crate::configurator::launch_configurator; use crate::disk_quota::set_disk_quota; @@ -995,13 +997,87 @@ impl ComputeNode { Ok(()) } - // Get basebackup from the libpq connection to pageserver using `connstr` and - // unarchive it to `pgdata` directory overriding all its previous content. + /// Fetches a basebackup from the Pageserver using the compute state's Pageserver connstring and + /// unarchives it to `pgdata` directory, replacing any existing contents. #[instrument(skip_all, fields(%lsn))] fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let spec = compute_state.pspec.as_ref().expect("spec must be set"); - let start_time = Instant::now(); + // Detect the protocol scheme. If the URL doesn't have a scheme, assume libpq. + let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); + let scheme = match Url::parse(shard0_connstr) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => return Err(anyhow!("invalid connstring URL: {err}")), + }; + + let started = Instant::now(); + let (connected, size) = match scheme.as_str() { + "postgresql" | "postgres" => self.try_get_basebackup_libpq(spec, lsn)?, + "grpc" => self.try_get_basebackup_grpc(spec, lsn)?, + scheme => return Err(anyhow!("unknown URL scheme {scheme}")), + }; + + let mut state = self.state.lock().unwrap(); + state.metrics.pageserver_connect_micros = + connected.duration_since(started).as_micros() as u64; + state.metrics.basebackup_bytes = size as u64; + state.metrics.basebackup_ms = started.elapsed().as_millis() as u64; + + Ok(()) + } + + /// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when + /// the connection was established, and the (compressed) size of the basebackup. + fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> { + let shard0_connstr = spec + .pageserver_connstr + .split(',') + .next() + .unwrap() + .to_string(); + let shard_index = match spec.pageserver_connstr.split(',').count() as u8 { + 0 | 1 => ShardIndex::unsharded(), + count => ShardIndex::new(ShardNumber(0), ShardCount(count)), + }; + + let (reader, connected) = tokio::runtime::Handle::current().block_on(async move { + let mut client = page_api::Client::new( + shard0_connstr, + spec.tenant_id, + spec.timeline_id, + shard_index, + spec.storage_auth_token.clone(), + None, // NB: base backups use payload compression + ) + .await?; + let connected = Instant::now(); + let reader = client + .get_base_backup(page_api::GetBaseBackupRequest { + lsn: (lsn != Lsn(0)).then_some(lsn), + compression: BaseBackupCompression::Gzip, + replica: spec.spec.mode != ComputeMode::Primary, + full: false, + }) + .await?; + anyhow::Ok((reader, connected)) + })?; + + let mut reader = MeasuredReader::new(tokio_util::io::SyncIoBridge::new(reader)); + + // Set `ignore_zeros` so that unpack() reads the entire stream and doesn't just stop at the + // end-of-archive marker. If the server errors, the tar::Builder drop handler will write an + // end-of-archive marker before the error is emitted, and we would not see the error. + let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut reader)); + ar.set_ignore_zeros(true); + ar.unpack(&self.params.pgdata)?; + + Ok((connected, reader.get_byte_count())) + } + + /// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp + /// when the connection was established, and the (compressed) size of the basebackup. + fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> { let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); let mut config = postgres::Config::from_str(shard0_connstr)?; @@ -1015,16 +1091,14 @@ impl ComputeNode { } config.application_name("compute_ctl"); - if let Some(spec) = &compute_state.pspec { - config.options(&format!( - "-c neon.compute_mode={}", - spec.spec.mode.to_type_str() - )); - } + config.options(&format!( + "-c neon.compute_mode={}", + spec.spec.mode.to_type_str() + )); // Connect to pageserver let mut client = config.connect(NoTls)?; - let pageserver_connect_micros = start_time.elapsed().as_micros() as u64; + let connected = Instant::now(); let basebackup_cmd = match lsn { Lsn(0) => { @@ -1061,16 +1135,13 @@ impl ComputeNode { // Set `ignore_zeros` so that unpack() reads all the Copy data and // doesn't stop at the end-of-archive marker. Otherwise, if the server // sends an Error after finishing the tarball, we will not notice it. + // The tar::Builder drop handler will write an end-of-archive marker + // before emitting the error, and we would not see it otherwise. let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader)); ar.set_ignore_zeros(true); ar.unpack(&self.params.pgdata)?; - // Report metrics - let mut state = self.state.lock().unwrap(); - state.metrics.pageserver_connect_micros = pageserver_connect_micros; - state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64; - state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64; - Ok(()) + Ok((connected, measured_reader.get_byte_count())) } // Gets the basebackup in a retry loop diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index c5283c2b09..42ee9b50e9 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -9,12 +9,13 @@ anyhow.workspace = true bytes.workspace = true futures.workspace = true pageserver_api.workspace = true -postgres_ffi.workspace = true +postgres_ffi_types.workspace = true prost.workspace = true strum.workspace = true strum_macros.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true tonic.workspace = true utils.workspace = true workspace_hack.workspace = true diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 71d539ab91..4b456787d2 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -1,8 +1,7 @@ -use std::convert::TryInto; - -use bytes::Bytes; -use futures::TryStreamExt; -use futures::{Stream, StreamExt}; +use anyhow::Result; +use futures::{Stream, StreamExt as _, TryStreamExt as _}; +use tokio::io::AsyncRead; +use tokio_util::io::StreamReader; use tonic::metadata::AsciiMetadataValue; use tonic::metadata::errors::InvalidMetadataValue; use tonic::transport::Channel; @@ -12,8 +11,6 @@ use utils::id::TenantId; use utils::id::TimelineId; use utils::shard::ShardIndex; -use anyhow::Result; - use crate::model; use crate::proto; @@ -69,6 +66,7 @@ impl tonic::service::Interceptor for AuthInterceptor { Ok(req) } } + #[derive(Clone)] pub struct Client { client: proto::PageServiceClient< @@ -120,22 +118,15 @@ impl Client { pub async fn get_base_backup( &mut self, req: model::GetBaseBackupRequest, - ) -> Result> + 'static, tonic::Status> { - let proto_req = proto::GetBaseBackupRequest::from(req); - - let response_stream: Streaming = - self.client.get_base_backup(proto_req).await?.into_inner(); - - // TODO: Consider dechunking internally - let domain_stream = response_stream.map(|chunk_res| { - chunk_res.and_then(|proto_chunk| { - proto_chunk.try_into().map_err(|e| { - tonic::Status::internal(format!("Failed to convert response chunk: {e}")) - }) - }) - }); - - Ok(domain_stream) + ) -> Result, tonic::Status> { + let req = proto::GetBaseBackupRequest::from(req); + let chunks = self.client.get_base_backup(req).await?.into_inner(); + let reader = StreamReader::new( + chunks + .map_ok(|resp| resp.chunk) + .map_err(std::io::Error::other), + ); + Ok(reader) } /// Returns the total size of a database, as # of bytes. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 1ca89b4870..0493f79781 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -18,8 +18,8 @@ use std::fmt::Display; use bytes::Bytes; -use postgres_ffi::Oid; -// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid +use postgres_ffi_types::Oid; +// TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid // pulling in all of their other crate dependencies when building the client. use utils::lsn::Lsn; diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 4111d09f92..4b7a70504a 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -355,9 +355,6 @@ impl Client for GrpcClient { full: false, compression: self.compression, }; - let stream = self.inner.get_base_backup(req).await?; - Ok(Box::pin(StreamReader::new( - stream.map_err(std::io::Error::other), - ))) + Ok(Box::pin(self.inner.get_base_backup(req).await?)) } } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index b74df50f86..e9a77ca2d6 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -68,7 +68,6 @@ num-integer = { version = "0.1", features = ["i128"] } num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] } num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] } num-traits = { version = "0.2", features = ["i128", "libm"] } -once_cell = { version = "1" } p256 = { version = "0.13", features = ["jwk"] } parquet = { version = "53", default-features = false, features = ["zstd"] } prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] } @@ -97,7 +96,7 @@ time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["full", "test-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } tokio-stream = { version = "0.1", features = ["net"] } -tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } +tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] } tracing = { version = "0.1", features = ["log"] } @@ -134,7 +133,6 @@ num-integer = { version = "0.1", features = ["i128"] } num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] } num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] } num-traits = { version = "0.2", features = ["i128", "libm"] } -once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1" }