mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
pageserver: use base backup cache with gRPC (#12352)
## Problem gRPC base backups do not use the base backup cache. Touches https://github.com/neondatabase/neon/issues/11728. ## Summary of changes Integrate gRPC base backups with the base backup cache. Also fixes a bug where the base backup cache did not differentiate between primary/replica base backups (at least I think that's a bug?).
This commit is contained in:
@@ -12,7 +12,7 @@ use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use anyhow::{Context as _, anyhow, bail};
|
||||
use anyhow::{Context as _, bail};
|
||||
use bytes::{Buf as _, BufMut as _, BytesMut};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
@@ -2608,18 +2608,9 @@ impl PageServerHandler {
|
||||
} else {
|
||||
let mut writer = BufWriter::new(pgb.copyout_writer());
|
||||
|
||||
let cached = {
|
||||
// Basebackup is cached only for this combination of parameters.
|
||||
if timeline.is_basebackup_cache_enabled()
|
||||
&& gzip
|
||||
&& lsn.is_some()
|
||||
&& prev_lsn.is_none()
|
||||
{
|
||||
timeline.get_cached_basebackup(lsn.unwrap()).await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
let cached = timeline
|
||||
.get_cached_basebackup_if_enabled(lsn, prev_lsn, full_backup, replica, gzip)
|
||||
.await;
|
||||
|
||||
if let Some(mut cached) = cached {
|
||||
from_cache = true;
|
||||
@@ -3555,21 +3546,41 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
|
||||
};
|
||||
|
||||
let result = basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
req.lsn,
|
||||
None,
|
||||
req.full,
|
||||
req.replica,
|
||||
gzip_level,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span) // propagate request span
|
||||
.await;
|
||||
simplex_write.shutdown().await.map_err(|err| {
|
||||
BasebackupError::Server(anyhow!("simplex shutdown failed: {err}"))
|
||||
})?;
|
||||
// Check for a cached basebackup.
|
||||
let cached = timeline
|
||||
.get_cached_basebackup_if_enabled(
|
||||
req.lsn,
|
||||
None,
|
||||
req.full,
|
||||
req.replica,
|
||||
gzip_level.is_some(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let result = if let Some(mut cached) = cached {
|
||||
// If we have a cached basebackup, send it.
|
||||
tokio::io::copy(&mut cached, &mut simplex_write)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|err| BasebackupError::Client(err, "cached,copy"))
|
||||
} else {
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
req.lsn,
|
||||
None,
|
||||
req.full,
|
||||
req.replica,
|
||||
gzip_level,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span) // propagate request span
|
||||
.await
|
||||
};
|
||||
simplex_write
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|err| BasebackupError::Client(err, "simplex_write"))?;
|
||||
result
|
||||
});
|
||||
|
||||
|
||||
@@ -2507,6 +2507,30 @@ impl Timeline {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Convenience method to attempt fetching a basebackup for the timeline if enabled and safe for
|
||||
/// the given request parameters.
|
||||
///
|
||||
/// TODO: consider moving this onto GrpcPageServiceHandler once the libpq handler is gone.
|
||||
pub async fn get_cached_basebackup_if_enabled(
|
||||
&self,
|
||||
lsn: Option<Lsn>,
|
||||
prev_lsn: Option<Lsn>,
|
||||
full: bool,
|
||||
replica: bool,
|
||||
gzip: bool,
|
||||
) -> Option<tokio::fs::File> {
|
||||
if !self.is_basebackup_cache_enabled() || !self.basebackup_cache.is_enabled() {
|
||||
return None;
|
||||
}
|
||||
// We have to know which LSN to fetch the basebackup for.
|
||||
let lsn = lsn?;
|
||||
// We only cache gzipped, non-full basebackups for primary computes with automatic prev_lsn.
|
||||
if prev_lsn.is_some() || full || replica || !gzip {
|
||||
return None;
|
||||
}
|
||||
self.get_cached_basebackup(lsn).await
|
||||
}
|
||||
|
||||
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
|
||||
/// The method is asynchronous and returns immediately.
|
||||
/// The actual basebackup preparation is performed in the background
|
||||
|
||||
Reference in New Issue
Block a user