From d81353b2d1263ce38b5381a914d1f0c7eef3c191 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 17 Jun 2025 05:37:43 -0700 Subject: [PATCH] pageserver: gRPC base backup fixes (#12243) ## Problem The gRPC base backup implementation has a few issues: chunks are not properly bounded, and it's not possible to omit the LSN. Touches #11728. ## Summary of changes * Properly bound chunks by using a limited writer. * Use an `Option` rather than a `ReadLsn` (the latter requires an LSN). --- pageserver/page_api/proto/page_service.proto | 6 +- pageserver/page_api/src/model.rs | 23 +++---- pageserver/src/page_service.rs | 69 ++++++++++---------- 3 files changed, 48 insertions(+), 50 deletions(-) diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 44976084bf..7d01dec4ab 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -102,10 +102,10 @@ message CheckRelExistsResponse { bool exists = 1; } -// Requests a base backup at a given LSN. +// Requests a base backup. message GetBaseBackupRequest { - // The LSN to fetch a base backup at. - ReadLsn read_lsn = 1; + // The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver. + uint64 lsn = 1; // If true, logical replication slots will not be created. bool replica = 2; } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 664ac0e6c4..799f48712f 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -182,33 +182,28 @@ impl From for proto::CheckRelExistsResponse { } } -/// Requests a base backup at a given LSN. +/// Requests a base backup. #[derive(Clone, Copy, Debug)] pub struct GetBaseBackupRequest { - /// The LSN to fetch a base backup at. - pub read_lsn: ReadLsn, + /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver. + pub lsn: Option, /// If true, logical replication slots will not be created. pub replica: bool, } -impl TryFrom for GetBaseBackupRequest { - type Error = ProtocolError; - - fn try_from(pb: proto::GetBaseBackupRequest) -> Result { - Ok(Self { - read_lsn: pb - .read_lsn - .ok_or(ProtocolError::Missing("read_lsn"))? - .try_into()?, +impl From for GetBaseBackupRequest { + fn from(pb: proto::GetBaseBackupRequest) -> Self { + Self { + lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)), replica: pb.replica, - }) + } } } impl From for proto::GetBaseBackupRequest { fn from(request: GetBaseBackupRequest) -> Self { Self { - read_lsn: Some(request.read_lsn.into()), + lsn: request.lsn.unwrap_or_default().0, replica: request.replica, } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d47f6bd095..0521f5c556 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,7 @@ use std::{io, str}; use anyhow::{Context as _, anyhow, bail}; use async_compression::tokio::write::GzipEncoder; -use bytes::{Buf, BytesMut}; +use bytes::{Buf as _, BufMut as _, BytesMut}; use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; @@ -3601,42 +3601,44 @@ impl proto::PageService for GrpcPageServiceHandler { let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); - // Validate the request, decorate the span, and wait for the LSN to arrive. - // - // TODO: this requires a read LSN, is that ok? + // Validate the request and decorate the span. Self::ensure_shard_zero(&timeline)?; if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } - let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?; + let req: page_api::GetBaseBackupRequest = req.into_inner().into(); - span_record!(lsn=%req.read_lsn); + span_record!(lsn=?req.lsn); - let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); - timeline - .wait_lsn( - req.read_lsn.request_lsn, - WaitLsnWaiter::PageService, - WaitLsnTimeout::Default, - &ctx, - ) - .await?; - timeline - .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn) - .map_err(|err| { - tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) - })?; + // Wait for the LSN to arrive, if given. + if let Some(lsn) = req.lsn { + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + timeline + .wait_lsn( + lsn, + WaitLsnWaiter::PageService, + WaitLsnTimeout::Default, + &ctx, + ) + .await?; + timeline + .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn) + .map_err(|err| { + tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) + })?; + } // Spawn a task to run the basebackup. // - // TODO: do we need to support full base backups, for debugging? + // TODO: do we need to support full base backups, for debugging? This also requires passing + // the prev_lsn parameter. let span = Span::current(); let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); let jh = tokio::spawn(async move { let result = basebackup::send_basebackup_tarball( &mut simplex_write, &timeline, - Some(req.read_lsn.request_lsn), + req.lsn, None, false, req.replica, @@ -3652,20 +3654,21 @@ impl proto::PageService for GrpcPageServiceHandler { // Emit chunks of size CHUNK_SIZE. let chunks = async_stream::try_stream! { - let mut chunk = BytesMut::with_capacity(CHUNK_SIZE); loop { - let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { - tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) - })?; - - // If we read 0 bytes, either the chunk is full or the stream is closed. - if n == 0 { - if chunk.is_empty() { - break; + let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE); + loop { + let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { + tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) + })?; + if n == 0 { + break; // full chunk or closed stream } - yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze()); - chunk.clear(); } + let chunk = chunk.into_inner().freeze(); + if chunk.is_empty() { + break; + } + yield proto::GetBaseBackupResponseChunk::from(chunk); } // Wait for the basebackup task to exit and check for errors. jh.await.map_err(|err| {