mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: payload compression for gRPC base backups (#12346)
## Problem gRPC base backups use gRPC compression. However, this has two problems: * Base backup caching will cache compressed base backups (making gRPC compression pointless). * Tonic does not support varying the compression level, and zstd default level is 10% slower than gzip fastest level. Touches https://github.com/neondatabase/neon/issues/11728. Touches https://github.com/neondatabase/cloud/issues/29353. ## Summary of changes This patch adds a gRPC parameter `BaseBackupRequest::compression` specifying the compression algorithm. It also moves compression into `send_basebackup_tarball` to reduce code duplication. A follow-up PR will integrate the base backup cache with gRPC.
This commit is contained in:
@@ -110,6 +110,19 @@ message GetBaseBackupRequest {
|
||||
bool replica = 2;
|
||||
// If true, include relation files in the base backup. Mainly for debugging and tests.
|
||||
bool full = 3;
|
||||
// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
|
||||
// compression, so that we can cache compressed backups on the server.
|
||||
BaseBackupCompression compression = 4;
|
||||
}
|
||||
|
||||
// Base backup compression algorithms.
|
||||
enum BaseBackupCompression {
|
||||
// Unknown algorithm. Used when clients send an unsupported algorithm.
|
||||
BASE_BACKUP_COMPRESSION_UNKNOWN = 0;
|
||||
// No compression.
|
||||
BASE_BACKUP_COMPRESSION_NONE = 1;
|
||||
// GZIP compression.
|
||||
BASE_BACKUP_COMPRESSION_GZIP = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
|
||||
@@ -95,7 +95,6 @@ impl Client {
|
||||
|
||||
if let Some(compression) = compression {
|
||||
// TODO: benchmark this (including network latency).
|
||||
// TODO: consider enabling compression by default.
|
||||
client = client
|
||||
.accept_compressed(compression)
|
||||
.send_compressed(compression);
|
||||
|
||||
@@ -191,15 +191,21 @@ pub struct GetBaseBackupRequest {
|
||||
pub replica: bool,
|
||||
/// If true, include relation files in the base backup. Mainly for debugging and tests.
|
||||
pub full: bool,
|
||||
/// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
|
||||
/// compression, so that we can cache compressed backups on the server.
|
||||
pub compression: BaseBackupCompression,
|
||||
}
|
||||
|
||||
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
fn from(pb: proto::GetBaseBackupRequest) -> Self {
|
||||
Self {
|
||||
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
|
||||
replica: pb.replica,
|
||||
full: pb.full,
|
||||
}
|
||||
compression: pb.compression.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,10 +215,55 @@ impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
lsn: request.lsn.unwrap_or_default().0,
|
||||
replica: request.replica,
|
||||
full: request.full,
|
||||
compression: request.compression.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Base backup compression algorithm.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum BaseBackupCompression {
|
||||
None,
|
||||
Gzip,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
|
||||
match pb {
|
||||
proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
|
||||
proto::BaseBackupCompression::None => Ok(Self::None),
|
||||
proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<i32> for BaseBackupCompression {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(compression: i32) -> Result<Self, Self::Error> {
|
||||
proto::BaseBackupCompression::try_from(compression)
|
||||
.map_err(|_| ProtocolError::invalid("compression", compression))
|
||||
.and_then(Self::try_from)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BaseBackupCompression> for proto::BaseBackupCompression {
|
||||
fn from(compression: BaseBackupCompression) -> Self {
|
||||
match compression {
|
||||
BaseBackupCompression::None => Self::None,
|
||||
BaseBackupCompression::Gzip => Self::Gzip,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BaseBackupCompression> for i32 {
|
||||
fn from(compression: BaseBackupCompression) -> Self {
|
||||
proto::BaseBackupCompression::from(compression).into()
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetBaseBackupResponseChunk = Bytes;
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
|
||||
|
||||
@@ -317,6 +317,7 @@ impl Client for LibpqClient {
|
||||
/// A gRPC Pageserver client.
|
||||
struct GrpcClient {
|
||||
inner: page_api::Client,
|
||||
compression: page_api::BaseBackupCompression,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
@@ -331,10 +332,14 @@ impl GrpcClient {
|
||||
ttid.timeline_id,
|
||||
ShardIndex::unsharded(),
|
||||
None,
|
||||
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
|
||||
None, // NB: uses payload compression
|
||||
)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
let compression = match compression {
|
||||
true => page_api::BaseBackupCompression::Gzip,
|
||||
false => page_api::BaseBackupCompression::None,
|
||||
};
|
||||
Ok(Self { inner, compression })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,6 +353,7 @@ impl Client for GrpcClient {
|
||||
lsn,
|
||||
replica: false,
|
||||
full: false,
|
||||
compression: self.compression,
|
||||
};
|
||||
let stream = self.inner.get_base_backup(req).await?;
|
||||
Ok(Box::pin(StreamReader::new(
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::fmt::Write as FmtWrite;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
@@ -25,8 +26,7 @@ use postgres_ffi::{
|
||||
};
|
||||
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use tokio::io;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::{self, AsyncWrite, AsyncWriteExt as _};
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -97,6 +97,7 @@ impl From<BasebackupError> for tonic::Status {
|
||||
/// * When working without safekeepers. In this situation it is important to match the lsn
|
||||
/// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
|
||||
/// to start the replication.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn send_basebackup_tarball<'a, W>(
|
||||
write: &'a mut W,
|
||||
timeline: &'a Timeline,
|
||||
@@ -104,6 +105,7 @@ pub async fn send_basebackup_tarball<'a, W>(
|
||||
prev_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
replica: bool,
|
||||
gzip_level: Option<async_compression::Level>,
|
||||
ctx: &'a RequestContext,
|
||||
) -> Result<(), BasebackupError>
|
||||
where
|
||||
@@ -122,7 +124,7 @@ where
|
||||
// prev_lsn value; that happens if the timeline was just branched from
|
||||
// an old LSN and it doesn't have any WAL of its own yet. We will set
|
||||
// prev_lsn to Lsn(0) if we cannot provide the correct value.
|
||||
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
|
||||
let (backup_prev, lsn) = if let Some(req_lsn) = req_lsn {
|
||||
// Backup was requested at a particular LSN. The caller should've
|
||||
// already checked that it's a valid LSN.
|
||||
|
||||
@@ -143,7 +145,7 @@ where
|
||||
};
|
||||
|
||||
// Consolidate the derived and the provided prev_lsn values
|
||||
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
|
||||
let prev_record_lsn = if let Some(provided_prev_lsn) = prev_lsn {
|
||||
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
|
||||
return Err(BasebackupError::Server(anyhow!(
|
||||
"backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
|
||||
@@ -155,30 +157,55 @@ where
|
||||
};
|
||||
|
||||
info!(
|
||||
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
|
||||
backup_lsn, prev_lsn, full_backup, replica
|
||||
"taking basebackup lsn={lsn}, prev_lsn={prev_record_lsn} \
|
||||
(full_backup={full_backup}, replica={replica}, gzip={gzip_level:?})",
|
||||
);
|
||||
let span = info_span!("send_tarball", backup_lsn=%lsn);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| BasebackupError::Shutdown)?,
|
||||
);
|
||||
|
||||
let basebackup = Basebackup {
|
||||
ar: Builder::new_non_terminated(write),
|
||||
timeline,
|
||||
lsn: backup_lsn,
|
||||
prev_record_lsn: prev_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| BasebackupError::Shutdown)?,
|
||||
),
|
||||
};
|
||||
basebackup
|
||||
if let Some(gzip_level) = gzip_level {
|
||||
let mut encoder = GzipEncoder::with_quality(write, gzip_level);
|
||||
Basebackup {
|
||||
ar: Builder::new_non_terminated(&mut encoder),
|
||||
timeline,
|
||||
lsn,
|
||||
prev_record_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency,
|
||||
}
|
||||
.send_tarball()
|
||||
.instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
|
||||
.await
|
||||
.instrument(span)
|
||||
.await?;
|
||||
encoder
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|err| BasebackupError::Client(err, "gzip"))?;
|
||||
} else {
|
||||
Basebackup {
|
||||
ar: Builder::new_non_terminated(write),
|
||||
timeline,
|
||||
lsn,
|
||||
prev_record_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency,
|
||||
}
|
||||
.send_tarball()
|
||||
.instrument(span)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is short-living object only for the time of tarball creation,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
|
||||
@@ -594,13 +593,6 @@ impl BackgroundTask {
|
||||
let file = tokio::fs::File::create(entry_tmp_path).await?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
|
||||
let mut encoder = GzipEncoder::with_quality(
|
||||
&mut writer,
|
||||
// Level::Best because compression is not on the hot path of basebackup requests.
|
||||
// The decompression is almost not affected by the compression level.
|
||||
async_compression::Level::Best,
|
||||
);
|
||||
|
||||
// We may receive a request before the WAL record is applied to the timeline.
|
||||
// Wait for the requested LSN to be applied.
|
||||
timeline
|
||||
@@ -613,17 +605,19 @@ impl BackgroundTask {
|
||||
.await?;
|
||||
|
||||
send_basebackup_tarball(
|
||||
&mut encoder,
|
||||
&mut writer,
|
||||
timeline,
|
||||
Some(req_lsn),
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
// Level::Best because compression is not on the hot path of basebackup requests.
|
||||
// The decompression is almost not affected by the compression level.
|
||||
Some(async_compression::Level::Best),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
encoder.shutdown().await?;
|
||||
writer.flush().await?;
|
||||
writer.into_inner().sync_all().await?;
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use anyhow::{Context as _, anyhow, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{Buf as _, BufMut as _, BytesMut};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
@@ -2613,6 +2612,7 @@ impl PageServerHandler {
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2641,31 +2641,6 @@ impl PageServerHandler {
|
||||
.map_err(|err| {
|
||||
BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
|
||||
})?;
|
||||
} else if gzip {
|
||||
let mut encoder = GzipEncoder::with_quality(
|
||||
&mut writer,
|
||||
// NOTE using fast compression because it's on the critical path
|
||||
// for compute startup. For an empty database, we get
|
||||
// <100KB with this method. The Level::Best compression method
|
||||
// gives us <20KB, but maybe we should add basebackup caching
|
||||
// on compute shutdown first.
|
||||
async_compression::Level::Fastest,
|
||||
);
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut encoder,
|
||||
&timeline,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
// shutdown the encoder to ensure the gzip footer is written
|
||||
encoder
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
|
||||
} else {
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut writer,
|
||||
@@ -2674,6 +2649,11 @@ impl PageServerHandler {
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
replica,
|
||||
// NB: using fast compression because it's on the critical path for compute
|
||||
// startup. For an empty database, we get <100KB with this method. The
|
||||
// Level::Best compression method gives us <20KB, but maybe we should add
|
||||
// basebackup caching on compute shutdown first.
|
||||
gzip.then_some(async_compression::Level::Fastest),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -3553,7 +3533,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
if timeline.is_archived() == Some(true) {
|
||||
return Err(tonic::Status::failed_precondition("timeline is archived"));
|
||||
}
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
|
||||
|
||||
span_record!(lsn=?req.lsn);
|
||||
|
||||
@@ -3579,6 +3559,15 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
let span = Span::current();
|
||||
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
|
||||
let jh = tokio::spawn(async move {
|
||||
let gzip_level = match req.compression {
|
||||
page_api::BaseBackupCompression::None => None,
|
||||
// NB: using fast compression because it's on the critical path for compute
|
||||
// startup. For an empty database, we get <100KB with this method. The
|
||||
// Level::Best compression method gives us <20KB, but maybe we should add
|
||||
// basebackup caching on compute shutdown first.
|
||||
page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
|
||||
};
|
||||
|
||||
let result = basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
@@ -3586,6 +3575,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
None,
|
||||
req.full,
|
||||
req.replica,
|
||||
gzip_level,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span) // propagate request span
|
||||
|
||||
Reference in New Issue
Block a user