mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
6 Commits
hackathon/
...
basebackup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a6e2249bd | ||
|
|
1815ae72f8 | ||
|
|
46c89c4190 | ||
|
|
1fc5c23c01 | ||
|
|
ddb98d6f77 | ||
|
|
45b71fecec |
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -604,7 +604,7 @@ dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"miniz_oxide 0.6.2",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
@@ -917,6 +917,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap 4.3.0",
|
||||
"compute_api",
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper",
|
||||
"notify",
|
||||
@@ -1399,6 +1400,16 @@ version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"miniz_oxide 0.7.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -2189,6 +2200,15 @@ dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
|
||||
dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.6"
|
||||
@@ -2558,6 +2578,7 @@ dependencies = [
|
||||
"enum-map",
|
||||
"enumset",
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
|
||||
@@ -32,6 +32,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
atty = "0.2.14"
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
flate2.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -15,6 +15,7 @@ use utils::lsn::Lsn;
|
||||
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -179,16 +180,21 @@ impl ComputeNode {
|
||||
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
|
||||
};
|
||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||
let mut measured_reader = MeasuredReader::new(copyreader);
|
||||
let mut decoder = flate2::read::GzDecoder::new(&mut measured_reader);
|
||||
|
||||
// Read the archive directly from the `CopyOutReader`
|
||||
//
|
||||
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
||||
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
||||
// sends an Error after finishing the tarball, we will not notice it.
|
||||
let mut ar = tar::Archive::new(copyreader);
|
||||
let mut ar = tar::Archive::new(&mut decoder);
|
||||
ar.set_ignore_zeros(true);
|
||||
ar.unpack(&self.pgdata)?;
|
||||
|
||||
// Report metrics
|
||||
self.state.lock().unwrap().metrics.basebackup_bytes =
|
||||
measured_reader.get_byte_count() as u64;
|
||||
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
|
||||
.signed_duration_since(start_time)
|
||||
.to_std()
|
||||
|
||||
@@ -71,6 +71,7 @@ pub struct ComputeMetrics {
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
pub basebackup_ms: u64,
|
||||
pub basebackup_bytes: u64,
|
||||
pub config_ms: u64,
|
||||
pub total_startup_ms: u64,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use pin_project_lite::pin_project;
|
||||
use std::io::Read;
|
||||
use std::pin::Pin;
|
||||
use std::{io, task};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
@@ -75,3 +76,34 @@ impl<S: AsyncWrite + Unpin, R, W: FnMut(usize)> AsyncWrite for MeasuredStream<S,
|
||||
self.project().stream.poll_shutdown(context)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for a reader that counts bytes read.
|
||||
///
|
||||
/// Similar to MeasuredStream but it's one way and it's sync
|
||||
pub struct MeasuredReader<R: Read> {
|
||||
inner: R,
|
||||
byte_count: usize,
|
||||
}
|
||||
|
||||
impl<R: Read> MeasuredReader<R> {
|
||||
pub fn new(reader: R) -> Self {
|
||||
Self {
|
||||
inner: reader,
|
||||
byte_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_byte_count(&self) -> usize {
|
||||
self.byte_count
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for MeasuredReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let result = self.inner.read(buf);
|
||||
if let Ok(n_bytes) = result {
|
||||
self.byte_count += n_bytes
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
flate2.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -24,14 +24,14 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::io;
|
||||
use std::io::{self, Write};
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -773,8 +773,9 @@ impl PageServerHandler {
|
||||
// Send a tarball of the latest layer on the timeline
|
||||
{
|
||||
let mut writer = pgb.copyout_writer();
|
||||
let mut raw_tar = Vec::new();
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut writer,
|
||||
&mut raw_tar,
|
||||
&timeline,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
@@ -782,6 +783,11 @@ impl PageServerHandler {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut encoder =
|
||||
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
|
||||
encoder.write_all(&raw_tar)?;
|
||||
let compressed_tar = encoder.finish()?;
|
||||
writer.write(&compressed_tar).await?;
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
|
||||
@@ -4,6 +4,49 @@ import pytest
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import get_dir_size
|
||||
|
||||
|
||||
# @pytest.mark.xfail # We currently pass a 16MB pg_wal dir instead of creating it client-side
|
||||
def test_basebackup_size(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Start
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
"basebackup_size", basebackup_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
# Stop so we force flush of any files and we can measure datadir sizes
|
||||
# NOTE the order of this line is important in relation to get_dir_size
|
||||
datadir = endpoint.pgdata_dir
|
||||
assert datadir is not None # for mypy
|
||||
endpoint.stop()
|
||||
|
||||
# Even though we don't insert any data, this nuber could be larger than basebackup
|
||||
# size because there could theoretically be compression, or postgres could create
|
||||
# or download data during startup. Currently if we don't send any pg_wal in the
|
||||
# basebackup, postgres will start up just fine, but during sync-safekeepers,
|
||||
# walproposer will try to recover the missing wal from safekeepers and cause the
|
||||
# same amount of network IO. We want to notice that if it happens.
|
||||
datadir_bytes = get_dir_size(datadir)
|
||||
zenbenchmark.record(
|
||||
"datadir_size", datadir_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
wal_bytes = get_dir_size(datadir + "/pg_wal")
|
||||
zenbenchmark.record("wal_size", wal_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Seems like a reasonable limit, but increase it if it becomes impossible to meet
|
||||
# assert basebackup_bytes < 70 * 1024
|
||||
# assert datadir_bytes < 70 * 1024
|
||||
# assert wal_bytes < 1 * 1024
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
|
||||
Reference in New Issue
Block a user