diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 81953a710f..d06b2cfca5 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -110,6 +110,19 @@ message GetBaseBackupRequest { bool replica = 2; // If true, include relation files in the base backup. Mainly for debugging and tests. bool full = 3; + // Compression algorithm to use. Base backups send a compressed payload instead of using gRPC + // compression, so that we can cache compressed backups on the server. + BaseBackupCompression compression = 4; +} + +// Base backup compression algorithms. +enum BaseBackupCompression { + // Unknown algorithm. Used when clients send an unsupported algorithm. + BASE_BACKUP_COMPRESSION_UNKNOWN = 0; + // No compression. + BASE_BACKUP_COMPRESSION_NONE = 1; + // GZIP compression. + BASE_BACKUP_COMPRESSION_GZIP = 2; } // Base backup response chunk, returned as an ordered stream. diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 3977ce7c23..71d539ab91 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -95,7 +95,6 @@ impl Client { if let Some(compression) = compression { // TODO: benchmark this (including network latency). - // TODO: consider enabling compression by default. client = client .accept_compressed(compression) .send_compressed(compression); diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 6efa742799..1ca89b4870 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -191,15 +191,21 @@ pub struct GetBaseBackupRequest { pub replica: bool, /// If true, include relation files in the base backup. Mainly for debugging and tests. pub full: bool, + /// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC + /// compression, so that we can cache compressed backups on the server. + pub compression: BaseBackupCompression, } -impl From for GetBaseBackupRequest { - fn from(pb: proto::GetBaseBackupRequest) -> Self { - Self { +impl TryFrom for GetBaseBackupRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::GetBaseBackupRequest) -> Result { + Ok(Self { lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)), replica: pb.replica, full: pb.full, - } + compression: pb.compression.try_into()?, + }) } } @@ -209,10 +215,55 @@ impl From for proto::GetBaseBackupRequest { lsn: request.lsn.unwrap_or_default().0, replica: request.replica, full: request.full, + compression: request.compression.into(), } } } +/// Base backup compression algorithm. +#[derive(Clone, Copy, Debug)] +pub enum BaseBackupCompression { + None, + Gzip, +} + +impl TryFrom for BaseBackupCompression { + type Error = ProtocolError; + + fn try_from(pb: proto::BaseBackupCompression) -> Result { + match pb { + proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)), + proto::BaseBackupCompression::None => Ok(Self::None), + proto::BaseBackupCompression::Gzip => Ok(Self::Gzip), + } + } +} + +impl TryFrom for BaseBackupCompression { + type Error = ProtocolError; + + fn try_from(compression: i32) -> Result { + proto::BaseBackupCompression::try_from(compression) + .map_err(|_| ProtocolError::invalid("compression", compression)) + .and_then(Self::try_from) + } +} + +impl From for proto::BaseBackupCompression { + fn from(compression: BaseBackupCompression) -> Self { + match compression { + BaseBackupCompression::None => Self::None, + BaseBackupCompression::Gzip => Self::Gzip, + } + } +} + +impl From for i32 { + fn from(compression: BaseBackupCompression) -> Self { + proto::BaseBackupCompression::from(compression).into() + } +} + pub type GetBaseBackupResponseChunk = Bytes; impl TryFrom for GetBaseBackupResponseChunk { diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index e028174c1d..4111d09f92 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -317,6 +317,7 @@ impl Client for LibpqClient { /// A gRPC Pageserver client. struct GrpcClient { inner: page_api::Client, + compression: page_api::BaseBackupCompression, } impl GrpcClient { @@ -331,10 +332,14 @@ impl GrpcClient { ttid.timeline_id, ShardIndex::unsharded(), None, - compression.then_some(tonic::codec::CompressionEncoding::Zstd), + None, // NB: uses payload compression ) .await?; - Ok(Self { inner }) + let compression = match compression { + true => page_api::BaseBackupCompression::Gzip, + false => page_api::BaseBackupCompression::None, + }; + Ok(Self { inner, compression }) } } @@ -348,6 +353,7 @@ impl Client for GrpcClient { lsn, replica: false, full: false, + compression: self.compression, }; let stream = self.inner.get_base_backup(req).await?; Ok(Box::pin(StreamReader::new( diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 115f0d9ebc..36dada1e89 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -14,6 +14,7 @@ use std::fmt::Write as FmtWrite; use std::time::{Instant, SystemTime}; use anyhow::{Context, anyhow}; +use async_compression::tokio::write::GzipEncoder; use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; use pageserver_api::key::{Key, rel_block_to_key}; @@ -25,8 +26,7 @@ use postgres_ffi::{ }; use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM}; -use tokio::io; -use tokio::io::AsyncWrite; +use tokio::io::{self, AsyncWrite, AsyncWriteExt as _}; use tokio_tar::{Builder, EntryType, Header}; use tracing::*; use utils::lsn::Lsn; @@ -97,6 +97,7 @@ impl From for tonic::Status { /// * When working without safekeepers. In this situation it is important to match the lsn /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver /// to start the replication. +#[allow(clippy::too_many_arguments)] pub async fn send_basebackup_tarball<'a, W>( write: &'a mut W, timeline: &'a Timeline, @@ -104,6 +105,7 @@ pub async fn send_basebackup_tarball<'a, W>( prev_lsn: Option, full_backup: bool, replica: bool, + gzip_level: Option, ctx: &'a RequestContext, ) -> Result<(), BasebackupError> where @@ -122,7 +124,7 @@ where // prev_lsn value; that happens if the timeline was just branched from // an old LSN and it doesn't have any WAL of its own yet. We will set // prev_lsn to Lsn(0) if we cannot provide the correct value. - let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn { + let (backup_prev, lsn) = if let Some(req_lsn) = req_lsn { // Backup was requested at a particular LSN. The caller should've // already checked that it's a valid LSN. @@ -143,7 +145,7 @@ where }; // Consolidate the derived and the provided prev_lsn values - let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn { + let prev_record_lsn = if let Some(provided_prev_lsn) = prev_lsn { if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn { return Err(BasebackupError::Server(anyhow!( "backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}" @@ -155,30 +157,55 @@ where }; info!( - "taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})", - backup_lsn, prev_lsn, full_backup, replica + "taking basebackup lsn={lsn}, prev_lsn={prev_record_lsn} \ + (full_backup={full_backup}, replica={replica}, gzip={gzip_level:?})", + ); + let span = info_span!("send_tarball", backup_lsn=%lsn); + + let io_concurrency = IoConcurrency::spawn_from_conf( + timeline.conf.get_vectored_concurrent_io, + timeline + .gate + .enter() + .map_err(|_| BasebackupError::Shutdown)?, ); - let basebackup = Basebackup { - ar: Builder::new_non_terminated(write), - timeline, - lsn: backup_lsn, - prev_record_lsn: prev_lsn, - full_backup, - replica, - ctx, - io_concurrency: IoConcurrency::spawn_from_conf( - timeline.conf.get_vectored_concurrent_io, - timeline - .gate - .enter() - .map_err(|_| BasebackupError::Shutdown)?, - ), - }; - basebackup + if let Some(gzip_level) = gzip_level { + let mut encoder = GzipEncoder::with_quality(write, gzip_level); + Basebackup { + ar: Builder::new_non_terminated(&mut encoder), + timeline, + lsn, + prev_record_lsn, + full_backup, + replica, + ctx, + io_concurrency, + } .send_tarball() - .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn)) - .await + .instrument(span) + .await?; + encoder + .shutdown() + .await + .map_err(|err| BasebackupError::Client(err, "gzip"))?; + } else { + Basebackup { + ar: Builder::new_non_terminated(write), + timeline, + lsn, + prev_record_lsn, + full_backup, + replica, + ctx, + io_concurrency, + } + .send_tarball() + .instrument(span) + .await?; + } + + Ok(()) } /// This is short-living object only for the time of tarball creation, diff --git a/pageserver/src/basebackup_cache.rs b/pageserver/src/basebackup_cache.rs index 24f6413380..69438dae7f 100644 --- a/pageserver/src/basebackup_cache.rs +++ b/pageserver/src/basebackup_cache.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Context; -use async_compression::tokio::write::GzipEncoder; use camino::{Utf8Path, Utf8PathBuf}; use metrics::core::{AtomicU64, GenericCounter}; use pageserver_api::{config::BasebackupCacheConfig, models::TenantState}; @@ -594,13 +593,6 @@ impl BackgroundTask { let file = tokio::fs::File::create(entry_tmp_path).await?; let mut writer = BufWriter::new(file); - let mut encoder = GzipEncoder::with_quality( - &mut writer, - // Level::Best because compression is not on the hot path of basebackup requests. - // The decompression is almost not affected by the compression level. - async_compression::Level::Best, - ); - // We may receive a request before the WAL record is applied to the timeline. // Wait for the requested LSN to be applied. timeline @@ -613,17 +605,19 @@ impl BackgroundTask { .await?; send_basebackup_tarball( - &mut encoder, + &mut writer, timeline, Some(req_lsn), None, false, false, + // Level::Best because compression is not on the hot path of basebackup requests. + // The decompression is almost not affected by the compression level. + Some(async_compression::Level::Best), &ctx, ) .await?; - encoder.shutdown().await?; writer.flush().await?; writer.into_inner().sync_all().await?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d3a1ca681e..dd02947e5c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,7 +13,6 @@ use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; use anyhow::{Context as _, anyhow, bail}; -use async_compression::tokio::write::GzipEncoder; use bytes::{Buf as _, BufMut as _, BytesMut}; use futures::future::BoxFuture; use futures::{FutureExt, Stream}; @@ -2613,6 +2612,7 @@ impl PageServerHandler { prev_lsn, full_backup, replica, + None, &ctx, ) .await?; @@ -2641,31 +2641,6 @@ impl PageServerHandler { .map_err(|err| { BasebackupError::Client(err, "handle_basebackup_request,cached,copy") })?; - } else if gzip { - let mut encoder = GzipEncoder::with_quality( - &mut writer, - // NOTE using fast compression because it's on the critical path - // for compute startup. For an empty database, we get - // <100KB with this method. The Level::Best compression method - // gives us <20KB, but maybe we should add basebackup caching - // on compute shutdown first. - async_compression::Level::Fastest, - ); - basebackup::send_basebackup_tarball( - &mut encoder, - &timeline, - lsn, - prev_lsn, - full_backup, - replica, - &ctx, - ) - .await?; - // shutdown the encoder to ensure the gzip footer is written - encoder - .shutdown() - .await - .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?; } else { basebackup::send_basebackup_tarball( &mut writer, @@ -2674,6 +2649,11 @@ impl PageServerHandler { prev_lsn, full_backup, replica, + // NB: using fast compression because it's on the critical path for compute + // startup. For an empty database, we get <100KB with this method. The + // Level::Best compression method gives us <20KB, but maybe we should add + // basebackup caching on compute shutdown first. + gzip.then_some(async_compression::Level::Fastest), &ctx, ) .await?; @@ -3553,7 +3533,7 @@ impl proto::PageService for GrpcPageServiceHandler { if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } - let req: page_api::GetBaseBackupRequest = req.into_inner().into(); + let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?; span_record!(lsn=?req.lsn); @@ -3579,6 +3559,15 @@ impl proto::PageService for GrpcPageServiceHandler { let span = Span::current(); let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); let jh = tokio::spawn(async move { + let gzip_level = match req.compression { + page_api::BaseBackupCompression::None => None, + // NB: using fast compression because it's on the critical path for compute + // startup. For an empty database, we get <100KB with this method. The + // Level::Best compression method gives us <20KB, but maybe we should add + // basebackup caching on compute shutdown first. + page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest), + }; + let result = basebackup::send_basebackup_tarball( &mut simplex_write, &timeline, @@ -3586,6 +3575,7 @@ impl proto::PageService for GrpcPageServiceHandler { None, req.full, req.replica, + gzip_level, &ctx, ) .instrument(span) // propagate request span