diff --git a/Cargo.lock b/Cargo.lock index 78cab72c10..9616d1e075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1424,6 +1424,7 @@ dependencies = [ "opentelemetry", "opentelemetry_sdk", "p256 0.13.2", + "pageserver_page_api", "postgres", "postgres_initdb", "regex", @@ -1442,6 +1443,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", + "tonic 0.13.1", "tower 0.5.2", "tower-http", "tower-otel", diff --git a/Cargo.toml b/Cargo.toml index 4863afe142..529caab409 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} tokio-stream = "0.1" tokio-tar = "0.3" -tokio-util = { version = "0.7.10", features = ["io", "rt"] } +tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] } toml = "0.8" toml_edit = "0.22" tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots"] } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index f9da3ba700..549131be11 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -38,6 +38,7 @@ once_cell.workspace = true opentelemetry.workspace = true opentelemetry_sdk.workspace = true p256 = { version = "0.13", features = ["pem"] } +pageserver_page_api.workspace = true postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["json"] } @@ -53,6 +54,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true tokio-util.workspace = true tokio-stream.workspace = true +tonic.workspace = true tower-otel.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index bd6ed910be..d0583a192b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use chrono::{DateTime, Utc}; use compute_api::privilege::Privilege; use compute_api::responses::{ @@ -15,6 +15,7 @@ use itertools::Itertools; use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; use once_cell::sync::Lazy; +use pageserver_page_api as page_api; use postgres; use postgres::NoTls; use postgres::error::SqlState; @@ -29,7 +30,9 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::{Duration, Instant}; use std::{env, fs}; +use tokio::io::AsyncReadExt; use tokio::spawn; +use tokio_util::io::StreamReader; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; use utils::id::{TenantId, TimelineId}; @@ -369,7 +372,7 @@ impl ComputeNode { let mut new_state = ComputeState::new(); if let Some(spec) = config.spec { - let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; + let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow!(msg))?; new_state.pspec = Some(pspec); } @@ -941,6 +944,77 @@ impl ComputeNode { #[instrument(skip_all, fields(%lsn))] fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let spec = compute_state.pspec.as_ref().expect("spec must be set"); + let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); + + match Url::parse(shard0_connstr)?.scheme() { + "postgres" | "postgresql" => self.try_get_basebackup_libpq(spec, lsn), + "grpc" => self.try_get_basebackup_grpc(spec, lsn), + scheme => return Err(anyhow!("unknown URL scheme {scheme}")), + } + } + + fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> { + let start_time = Instant::now(); + + let shard0_connstr = spec + .pageserver_connstr + .split(',') + .next() + .unwrap() + .to_string(); + + let chunks = tokio::runtime::Handle::current().block_on(async move { + let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?; + + let req = page_api::proto::GetBaseBackupRequest { + read_lsn: Some(page_api::proto::ReadLsn { + request_lsn: lsn.0, + not_modified_since_lsn: 0, + }), + replica: false, // TODO: handle replicas, with LSN 0 + }; + let mut req = tonic::Request::new(req); + let metadata = req.metadata_mut(); + metadata.insert("neon-tenant-id", spec.tenant_id.to_string().parse()?); + metadata.insert("neon-timeline-id", spec.timeline_id.to_string().parse()?); + metadata.insert("neon-shard-id", "0000".to_string().parse()?); // TODO: shard count + if let Some(auth) = spec.storage_auth_token.as_ref() { + metadata.insert("authorization", format!("Bearer {auth}").parse()?); + } + + let chunks = client.get_base_backup(req).await?.into_inner(); + anyhow::Ok(chunks) + })?; + let pageserver_connect_micros = start_time.elapsed().as_micros() as u64; + + // Convert the chunks stream into an AsyncRead + let stream_reader = StreamReader::new( + chunks.map(|chunk| chunk.map(|c| c.chunk).map_err(std::io::Error::other)), + ); + + // Wrap the AsyncRead into a blocking reader for compatibility with tar::Archive + let reader = tokio_util::io::SyncIoBridge::new(stream_reader); + let mut measured_reader = MeasuredReader::new(reader); + let mut bufreader = std::io::BufReader::new(&mut measured_reader); + + // Read the archive directly from the `CopyOutReader` + // + // Set `ignore_zeros` so that unpack() reads all the Copy data and + // doesn't stop at the end-of-archive marker. Otherwise, if the server + // sends an Error after finishing the tarball, we will not notice it. + let mut ar = tar::Archive::new(&mut bufreader); + ar.set_ignore_zeros(true); + ar.unpack(&self.params.pgdata)?; + + // Report metrics + let mut state = self.state.lock().unwrap(); + state.metrics.pageserver_connect_micros = pageserver_connect_micros; + state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64; + state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64; + Ok(()) + } + + fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> { let start_time = Instant::now(); let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); @@ -956,12 +1030,10 @@ impl ComputeNode { } config.application_name("compute_ctl"); - if let Some(spec) = &compute_state.pspec { - config.options(&format!( - "-c neon.compute_mode={}", - spec.spec.mode.to_type_str() - )); - } + config.options(&format!( + "-c neon.compute_mode={}", + spec.spec.mode.to_type_str() + )); // Connect to pageserver let mut client = config.connect(NoTls)?; @@ -1035,10 +1107,7 @@ impl ComputeNode { return result; } Err(ref e) if attempts < max_attempts => { - warn!( - "Failed to get basebackup: {} (attempt {}/{})", - e, attempts, max_attempts - ); + warn!("Failed to get basebackup: {e:?} (attempt {attempts}/{max_attempts})"); std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64)); retry_period_ms *= 1.5; } @@ -1916,7 +1985,7 @@ LIMIT 100", self.params .remote_ext_base_url .as_ref() - .ok_or(DownloadError::BadInput(anyhow::anyhow!( + .ok_or(DownloadError::BadInput(anyhow!( "Remote extensions storage is not configured", )))?; @@ -2112,7 +2181,7 @@ LIMIT 100", let remote_extensions = spec .remote_extensions .as_ref() - .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?; + .ok_or(anyhow!("Remote extensions are not configured"))?; info!("parse shared_preload_libraries from spec.cluster.settings"); let mut libs_vec = Vec::new(); diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 0e23b70265..c930c365bf 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -4,6 +4,7 @@ //! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or //! compute_ctl can fetch it by calling the control plane's API. use std::collections::HashMap; +use std::fmt::Display; use indexmap::IndexMap; use regex::Regex; @@ -319,6 +320,12 @@ impl ComputeMode { } } +impl Display for ComputeMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.to_type_str()) + } +} + /// Log level for audit logging #[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)] pub enum ComputeAudit { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index b3eeaece22..8ed20a7c8b 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -195,11 +195,25 @@ impl TryFrom for GetBaseBackupRequest { type Error = ProtocolError; fn try_from(pb: proto::GetBaseBackupRequest) -> Result { - Ok(Self { - read_lsn: pb - .read_lsn + // Allow 0 read_lsn for base backups. + // TODO: reconsider requiring request_lsn > 0. + let zero = proto::ReadLsn { + request_lsn: 0, + not_modified_since_lsn: 0, + }; + let read_lsn = if pb.read_lsn == Some(zero) || pb.read_lsn.is_none() { + ReadLsn { + request_lsn: Lsn(0), + not_modified_since_lsn: None, + } + } else { + pb.read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? - .try_into()?, + .try_into()? + }; + + Ok(Self { + read_lsn, replica: pb.replica, }) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4a1ddf09b5..cd9cb2c61c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,7 @@ use std::{io, str}; use anyhow::{Context as _, anyhow, bail}; use async_compression::tokio::write::GzipEncoder; -use bytes::{Buf, BytesMut}; +use bytes::{Buf, BufMut as _, BytesMut}; use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; @@ -3610,20 +3610,24 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(lsn=%req.read_lsn); - let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); - timeline - .wait_lsn( - req.read_lsn.request_lsn, - WaitLsnWaiter::PageService, - WaitLsnTimeout::Default, - &ctx, - ) - .await?; - timeline - .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn) - .map_err(|err| { - tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) - })?; + let mut lsn = None; + if req.read_lsn.request_lsn > Lsn(0) { + lsn = Some(req.read_lsn.request_lsn); + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + timeline + .wait_lsn( + req.read_lsn.request_lsn, + WaitLsnWaiter::PageService, + WaitLsnTimeout::Default, + &ctx, + ) + .await?; + timeline + .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn) + .map_err(|err| { + tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) + })?; + } // Spawn a task to run the basebackup. // @@ -3634,7 +3638,7 @@ impl proto::PageService for GrpcPageServiceHandler { let result = basebackup::send_basebackup_tarball( &mut simplex_write, &timeline, - Some(req.read_lsn.request_lsn), + lsn, None, false, req.replica, @@ -3650,20 +3654,21 @@ impl proto::PageService for GrpcPageServiceHandler { // Emit chunks of size CHUNK_SIZE. let chunks = async_stream::try_stream! { - let mut chunk = BytesMut::with_capacity(CHUNK_SIZE); loop { - let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { - tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) - })?; + let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE); + let mut n = 1; + while n != 0 { + n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { + tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) + })?; + } + let chunk = chunk.into_inner(); // If we read 0 bytes, either the chunk is full or the stream is closed. - if n == 0 { - if chunk.is_empty() { - break; - } - yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze()); - chunk.clear(); + if chunk.is_empty() { + break; } + yield proto::GetBaseBackupResponseChunk::from(chunk.freeze()); } // Wait for the basebackup task to exit and check for errors. jh.await.map_err(|err| {