From d0b3629412e58727458f8ad5a1082063ee56bef2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 13 Jun 2025 13:47:26 -0700 Subject: [PATCH] Tweak base backups --- compute_tools/src/compute.rs | 5 +-- pageserver/page_api/proto/page_service.proto | 4 +-- pageserver/page_api/src/model.rs | 35 +++++--------------- pageserver/pagebench/src/cmd/basebackup.rs | 5 +-- pageserver/src/page_service.rs | 31 +++++++++-------- 5 files changed, 27 insertions(+), 53 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d0583a192b..852ae4e4f4 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -967,10 +967,7 @@ impl ComputeNode { let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?; let req = page_api::proto::GetBaseBackupRequest { - read_lsn: Some(page_api::proto::ReadLsn { - request_lsn: lsn.0, - not_modified_since_lsn: 0, - }), + lsn: lsn.0, replica: false, // TODO: handle replicas, with LSN 0 }; let mut req = tonic::Request::new(req); diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 44976084bf..4004af97d0 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -104,8 +104,8 @@ message CheckRelExistsResponse { // Requests a base backup at a given LSN. message GetBaseBackupRequest { - // The LSN to fetch a base backup at. - ReadLsn read_lsn = 1; + // The LSN to fetch a base backup at. 0 or absent means the latest LSN known to the Pageserver. + uint64 lsn = 1; // If true, logical replication slots will not be created. bool replica = 2; } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 8ed20a7c8b..cf6cc2740c 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -185,44 +185,25 @@ impl From for proto::CheckRelExistsResponse { /// Requests a base backup at a given LSN. #[derive(Clone, Copy, Debug)] pub struct GetBaseBackupRequest { - /// The LSN to fetch a base backup at. - pub read_lsn: ReadLsn, + /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver. + pub lsn: Option, /// If true, logical replication slots will not be created. pub replica: bool, } -impl TryFrom for GetBaseBackupRequest { - type Error = ProtocolError; - - fn try_from(pb: proto::GetBaseBackupRequest) -> Result { - // Allow 0 read_lsn for base backups. - // TODO: reconsider requiring request_lsn > 0. - let zero = proto::ReadLsn { - request_lsn: 0, - not_modified_since_lsn: 0, - }; - let read_lsn = if pb.read_lsn == Some(zero) || pb.read_lsn.is_none() { - ReadLsn { - request_lsn: Lsn(0), - not_modified_since_lsn: None, - } - } else { - pb.read_lsn - .ok_or(ProtocolError::Missing("read_lsn"))? - .try_into()? - }; - - Ok(Self { - read_lsn, +impl From for GetBaseBackupRequest { + fn from(pb: proto::GetBaseBackupRequest) -> Self { + Self { + lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)), replica: pb.replica, - }) + } } } impl From for proto::GetBaseBackupRequest { fn from(request: GetBaseBackupRequest) -> Self { Self { - read_lsn: Some(request.read_lsn.into()), + lsn: request.lsn.unwrap_or_default().0, replica: request.replica, } } diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 676f157e69..e337df9d5c 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -320,10 +320,7 @@ async fn client_grpc( let mut basebackup_stream = client .get_base_backup( GetBaseBackupRequest { - read_lsn: ReadLsn { - request_lsn: lsn, - not_modified_since_lsn: Some(lsn), - }, + lsn: Some(lsn), replica: false, }, gzip, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cd9cb2c61c..cd48f5455b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3606,24 +3606,22 @@ impl proto::PageService for GrpcPageServiceHandler { if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } - let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?; + let req: page_api::GetBaseBackupRequest = req.into_inner().into(); - span_record!(lsn=%req.read_lsn); + span_record!(lsn=?req.lsn); - let mut lsn = None; - if req.read_lsn.request_lsn > Lsn(0) { - lsn = Some(req.read_lsn.request_lsn); + if let Some(lsn) = req.lsn { let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); timeline .wait_lsn( - req.read_lsn.request_lsn, + lsn, WaitLsnWaiter::PageService, WaitLsnTimeout::Default, &ctx, ) .await?; timeline - .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn) + .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn) .map_err(|err| { tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) })?; @@ -3631,14 +3629,15 @@ impl proto::PageService for GrpcPageServiceHandler { // Spawn a task to run the basebackup. // - // TODO: do we need to support full base backups, for debugging? + // TODO: do we need to support full base backups, for debugging? This also requires passing + // the prev_lsn parameter. let span = Span::current(); let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); let jh = tokio::spawn(async move { let result = basebackup::send_basebackup_tarball( &mut simplex_write, &timeline, - lsn, + req.lsn, None, false, req.replica, @@ -3656,19 +3655,19 @@ impl proto::PageService for GrpcPageServiceHandler { let chunks = async_stream::try_stream! { loop { let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE); - let mut n = 1; - while n != 0 { - n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { + loop { + let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) })?; + if n == 0 { + break; // full chunk or closed stream + } } - let chunk = chunk.into_inner(); - - // If we read 0 bytes, either the chunk is full or the stream is closed. + let chunk = chunk.into_inner().freeze(); if chunk.is_empty() { break; } - yield proto::GetBaseBackupResponseChunk::from(chunk.freeze()); + yield proto::GetBaseBackupResponseChunk::from(chunk); } // Wait for the basebackup task to exit and check for errors. jh.await.map_err(|err| {