mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Tweak base backups
This commit is contained in:
@@ -967,10 +967,7 @@ impl ComputeNode {
|
||||
let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?;
|
||||
|
||||
let req = page_api::proto::GetBaseBackupRequest {
|
||||
read_lsn: Some(page_api::proto::ReadLsn {
|
||||
request_lsn: lsn.0,
|
||||
not_modified_since_lsn: 0,
|
||||
}),
|
||||
lsn: lsn.0,
|
||||
replica: false, // TODO: handle replicas, with LSN 0
|
||||
};
|
||||
let mut req = tonic::Request::new(req);
|
||||
|
||||
@@ -104,8 +104,8 @@ message CheckRelExistsResponse {
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// The LSN to fetch a base backup at. 0 or absent means the latest LSN known to the Pageserver.
|
||||
uint64 lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
@@ -185,44 +185,25 @@ impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
|
||||
/// Requests a base backup at a given LSN.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetBaseBackupRequest {
|
||||
/// The LSN to fetch a base backup at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
|
||||
pub lsn: Option<Lsn>,
|
||||
/// If true, logical replication slots will not be created.
|
||||
pub replica: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
// Allow 0 read_lsn for base backups.
|
||||
// TODO: reconsider requiring request_lsn > 0.
|
||||
let zero = proto::ReadLsn {
|
||||
request_lsn: 0,
|
||||
not_modified_since_lsn: 0,
|
||||
};
|
||||
let read_lsn = if pb.read_lsn == Some(zero) || pb.read_lsn.is_none() {
|
||||
ReadLsn {
|
||||
request_lsn: Lsn(0),
|
||||
not_modified_since_lsn: None,
|
||||
}
|
||||
} else {
|
||||
pb.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
read_lsn,
|
||||
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
fn from(pb: proto::GetBaseBackupRequest) -> Self {
|
||||
Self {
|
||||
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
|
||||
replica: pb.replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
fn from(request: GetBaseBackupRequest) -> Self {
|
||||
Self {
|
||||
read_lsn: Some(request.read_lsn.into()),
|
||||
lsn: request.lsn.unwrap_or_default().0,
|
||||
replica: request.replica,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,10 +320,7 @@ async fn client_grpc(
|
||||
let mut basebackup_stream = client
|
||||
.get_base_backup(
|
||||
GetBaseBackupRequest {
|
||||
read_lsn: ReadLsn {
|
||||
request_lsn: lsn,
|
||||
not_modified_since_lsn: Some(lsn),
|
||||
},
|
||||
lsn: Some(lsn),
|
||||
replica: false,
|
||||
},
|
||||
gzip,
|
||||
|
||||
@@ -3606,24 +3606,22 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
if timeline.is_archived() == Some(true) {
|
||||
return Err(tonic::Status::failed_precondition("timeline is archived"));
|
||||
}
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
|
||||
|
||||
span_record!(lsn=%req.read_lsn);
|
||||
span_record!(lsn=?req.lsn);
|
||||
|
||||
let mut lsn = None;
|
||||
if req.read_lsn.request_lsn > Lsn(0) {
|
||||
lsn = Some(req.read_lsn.request_lsn);
|
||||
if let Some(lsn) = req.lsn {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
timeline
|
||||
.wait_lsn(
|
||||
req.read_lsn.request_lsn,
|
||||
lsn,
|
||||
WaitLsnWaiter::PageService,
|
||||
WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.map_err(|err| {
|
||||
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
|
||||
})?;
|
||||
@@ -3631,14 +3629,15 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
// Spawn a task to run the basebackup.
|
||||
//
|
||||
// TODO: do we need to support full base backups, for debugging?
|
||||
// TODO: do we need to support full base backups, for debugging? This also requires passing
|
||||
// the prev_lsn parameter.
|
||||
let span = Span::current();
|
||||
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
|
||||
let jh = tokio::spawn(async move {
|
||||
let result = basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
lsn,
|
||||
req.lsn,
|
||||
None,
|
||||
false,
|
||||
req.replica,
|
||||
@@ -3656,19 +3655,19 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
let chunks = async_stream::try_stream! {
|
||||
loop {
|
||||
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
|
||||
let mut n = 1;
|
||||
while n != 0 {
|
||||
n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
loop {
|
||||
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
|
||||
})?;
|
||||
if n == 0 {
|
||||
break; // full chunk or closed stream
|
||||
}
|
||||
}
|
||||
let chunk = chunk.into_inner();
|
||||
|
||||
// If we read 0 bytes, either the chunk is full or the stream is closed.
|
||||
let chunk = chunk.into_inner().freeze();
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk.freeze());
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk);
|
||||
}
|
||||
// Wait for the basebackup task to exit and check for errors.
|
||||
jh.await.map_err(|err| {
|
||||
|
||||
Reference in New Issue
Block a user