From 92aee7e07f347a0cc125462705811963ab5c78e9 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 11 Jul 2023 13:11:23 -0400 Subject: [PATCH] cold starts: basebackup compression (#4482) Co-authored-by: Alex Chi Z --- Cargo.lock | 38 +++++++++++++- Cargo.toml | 2 + compute_tools/Cargo.toml | 2 + compute_tools/src/compute.rs | 44 ++++++++++++++-- libs/compute_api/src/responses.rs | 1 + libs/utils/src/measured_stream.rs | 32 ++++++++++++ pageserver/Cargo.toml | 2 + pageserver/src/page_service.rs | 69 +++++++++++++++++++++++-- test_runner/performance/test_startup.py | 5 ++ 9 files changed, 185 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b5539bdf5..45f5acce7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-compression" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -593,7 +606,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.6.2", "object", "rustc-demangle", ] @@ -882,9 +895,11 @@ name = "compute_tools" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "chrono", "clap", "compute_api", + "flate2", "futures", "hyper", "notify", @@ -1367,6 +1382,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flate2" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +dependencies = [ + "crc32fast", + "miniz_oxide 0.7.1", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2151,6 +2176,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.6" @@ -2482,6 +2516,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "async-stream", "async-trait", "byteorder", @@ -2498,6 +2533,7 @@ dependencies = [ "enum-map", "enumset", "fail", + "flate2", "futures", "git-version", "hex", diff --git a/Cargo.toml b/Cargo.toml index f36e8f6569..6d35334deb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ license = "Apache-2.0" ## All dependency versions, used in the project [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +async-compression = { version = "0.4.0", features = ["tokio", "gzip"] } +flate2 = "1.0.26" async-stream = "0.3" async-trait = "0.1" aws-config = { version = "0.55", default-features = false, features=["rustls"] } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 21226249cf..f8f8f729ce 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -6,8 +6,10 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-compression.workspace = true chrono.workspace = true clap.workspace = true +flate2.workspace = true futures.workspace = true hyper = { workspace = true, features = ["full"] } notify.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9fcdae73a4..38f3b53f65 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,4 +1,5 @@ use std::fs; +use std::io::BufRead; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; @@ -15,6 +16,7 @@ use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; +use utils::measured_stream::MeasuredReader; use crate::config; use crate::pg_helpers::*; @@ -253,20 +255,52 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), // First start of the compute - _ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn), + // HACK We don't use compression on first start (Lsn(0)) because there's no API for it + Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), + _ => format!( + "basebackup {} {} {} --gzip", + spec.tenant_id, spec.timeline_id, lsn + ), }; + let copyreader = client.copy_out(basebackup_cmd.as_str())?; + let mut measured_reader = MeasuredReader::new(copyreader); + + // Check the magic number to see if it's a gzip or not. Even though + // we might explicitly ask for gzip, an old pageserver with no implementation + // of gzip compression might send us uncompressed data. After some time + // passes we can assume all pageservers know how to compress and we can + // delete this check. + // + // If the data is not gzip, it will be tar. It will not be mistakenly + // recognized as gzip because tar starts with an ascii encoding of a filename, + // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover, + // we send the "global" directory first from the pageserver, so it definitely + // won't be recognized as gzip. + let mut bufreader = std::io::BufReader::new(&mut measured_reader); + let gzip = { + let peek = bufreader.fill_buf().unwrap(); + peek[0] == 0x1f && peek[1] == 0x8b + }; // Read the archive directly from the `CopyOutReader` // // Set `ignore_zeros` so that unpack() reads all the Copy data and // doesn't stop at the end-of-archive marker. Otherwise, if the server // sends an Error after finishing the tarball, we will not notice it. - let mut ar = tar::Archive::new(copyreader); - ar.set_ignore_zeros(true); - ar.unpack(&self.pgdata)?; + if gzip { + let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader)); + ar.set_ignore_zeros(true); + ar.unpack(&self.pgdata)?; + } else { + let mut ar = tar::Archive::new(&mut bufreader); + ar.set_ignore_zeros(true); + ar.unpack(&self.pgdata)?; + }; + // Report metrics + self.state.lock().unwrap().metrics.basebackup_bytes = + measured_reader.get_byte_count() as u64; self.state.lock().unwrap().metrics.basebackup_ms = Utc::now() .signed_duration_since(start_time) .to_std() diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 80e5341216..6124c81f50 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -71,6 +71,7 @@ pub struct ComputeMetrics { pub wait_for_spec_ms: u64, pub sync_safekeepers_ms: u64, pub basebackup_ms: u64, + pub basebackup_bytes: u64, pub start_postgres_ms: u64, pub config_ms: u64, pub total_startup_ms: u64, diff --git a/libs/utils/src/measured_stream.rs b/libs/utils/src/measured_stream.rs index c37d686a1d..c82fc13109 100644 --- a/libs/utils/src/measured_stream.rs +++ b/libs/utils/src/measured_stream.rs @@ -1,4 +1,5 @@ use pin_project_lite::pin_project; +use std::io::Read; use std::pin::Pin; use std::{io, task}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -75,3 +76,34 @@ impl AsyncWrite for MeasuredStream { + inner: R, + byte_count: usize, +} + +impl MeasuredReader { + pub fn new(reader: R) -> Self { + Self { + inner: reader, + byte_count: 0, + } + } + + pub fn get_byte_count(&self) -> usize { + self.byte_count + } +} + +impl Read for MeasuredReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let result = self.inner.read(buf); + if let Ok(n_bytes) = result { + self.byte_count += n_bytes + } + result + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ea81544cbe..9381ed0bfa 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true +async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true byteorder.workspace = true @@ -24,6 +25,7 @@ consumption_metrics.workspace = true crc32c.workspace = true crossbeam-utils.workspace = true either.workspace = true +flate2.workspace = true fail.workspace = true futures.workspace = true git-version.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 118b0c0bae..35dd5ecdb5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,6 +10,7 @@ // use anyhow::Context; +use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; use futures::Stream; @@ -31,6 +32,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; use tracing::field; @@ -756,6 +758,7 @@ impl PageServerHandler { lsn: Option, prev_lsn: Option, full_backup: bool, + gzip: bool, ctx: RequestContext, ) -> anyhow::Result<()> where @@ -783,8 +786,9 @@ impl PageServerHandler { pgb.write_message_noflush(&BeMessage::CopyOutResponse)?; pgb.flush().await?; - // Send a tarball of the latest layer on the timeline - { + // Send a tarball of the latest layer on the timeline. Compress if not + // fullbackup. TODO Compress in that case too (tests need to be updated) + if full_backup { let mut writer = pgb.copyout_writer(); basebackup::send_basebackup_tarball( &mut writer, @@ -795,6 +799,40 @@ impl PageServerHandler { &ctx, ) .await?; + } else { + let mut writer = pgb.copyout_writer(); + if gzip { + let mut encoder = GzipEncoder::with_quality( + writer, + // NOTE using fast compression because it's on the critical path + // for compute startup. For an empty database, we get + // <100KB with this method. The Level::Best compression method + // gives us <20KB, but maybe we should add basebackup caching + // on compute shutdown first. + async_compression::Level::Fastest, + ); + basebackup::send_basebackup_tarball( + &mut encoder, + &timeline, + lsn, + prev_lsn, + full_backup, + &ctx, + ) + .await?; + // shutdown the encoder to ensure the gzip footer is written + encoder.shutdown().await?; + } else { + basebackup::send_basebackup_tarball( + &mut writer, + &timeline, + lsn, + prev_lsn, + full_backup, + &ctx, + ) + .await?; + } } pgb.write_message_noflush(&BeMessage::CopyDone)?; @@ -933,6 +971,19 @@ where None }; + let gzip = if params.len() >= 4 { + if params[3] == "--gzip" { + true + } else { + return Err(QueryError::Other(anyhow::anyhow!( + "Parameter in position 3 unknown {}", + params[3], + ))); + } + } else { + false + }; + metrics::metric_vec_duration::observe_async_block_duration_by_result( &*crate::metrics::BASEBACKUP_QUERY_TIME, async move { @@ -943,6 +994,7 @@ where lsn, None, false, + gzip, ctx, ) .await?; @@ -1028,8 +1080,17 @@ where self.check_permission(Some(tenant_id))?; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true, ctx) - .await?; + self.handle_basebackup_request( + pgb, + tenant_id, + timeline_id, + lsn, + prev_lsn, + true, + false, + ctx, + ) + .await?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") { // Import the `base` section (everything but the wal) of a basebackup. diff --git a/test_runner/performance/test_startup.py b/test_runner/performance/test_startup.py index 4744c1ed2e..d897df1bcb 100644 --- a/test_runner/performance/test_startup.py +++ b/test_runner/performance/test_startup.py @@ -60,6 +60,11 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc value = metrics[key] zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER) + # Check basebackup size makes sense + basebackup_bytes = metrics["basebackup_bytes"] + if i > 0: + assert basebackup_bytes < 100 * 1024 + # Stop so we can restart endpoint.stop()