mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 08:50:38 +00:00
Compare commits
6 Commits
remove_ini
...
basebackup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a6e2249bd | ||
|
|
1815ae72f8 | ||
|
|
46c89c4190 | ||
|
|
1fc5c23c01 | ||
|
|
ddb98d6f77 | ||
|
|
45b71fecec |
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -604,7 +604,7 @@ dependencies = [
|
|||||||
"cc",
|
"cc",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"libc",
|
"libc",
|
||||||
"miniz_oxide",
|
"miniz_oxide 0.6.2",
|
||||||
"object",
|
"object",
|
||||||
"rustc-demangle",
|
"rustc-demangle",
|
||||||
]
|
]
|
||||||
@@ -917,6 +917,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"clap 4.3.0",
|
"clap 4.3.0",
|
||||||
"compute_api",
|
"compute_api",
|
||||||
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"notify",
|
"notify",
|
||||||
@@ -1399,6 +1400,16 @@ version = "0.4.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "flate2"
|
||||||
|
version = "1.0.26"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
|
||||||
|
dependencies = [
|
||||||
|
"crc32fast",
|
||||||
|
"miniz_oxide 0.7.1",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fnv"
|
name = "fnv"
|
||||||
version = "1.0.7"
|
version = "1.0.7"
|
||||||
@@ -2189,6 +2200,15 @@ dependencies = [
|
|||||||
"adler",
|
"adler",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "miniz_oxide"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
|
||||||
|
dependencies = [
|
||||||
|
"adler",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.8.6"
|
version = "0.8.6"
|
||||||
@@ -2558,6 +2578,7 @@ dependencies = [
|
|||||||
"enum-map",
|
"enum-map",
|
||||||
"enumset",
|
"enumset",
|
||||||
"fail",
|
"fail",
|
||||||
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
"git-version",
|
"git-version",
|
||||||
"hex",
|
"hex",
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ license = "Apache-2.0"
|
|||||||
## All dependency versions, used in the project
|
## All dependency versions, used in the project
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||||
|
flate2 = "1.0.26"
|
||||||
async-stream = "0.3"
|
async-stream = "0.3"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
atty = "0.2.14"
|
atty = "0.2.14"
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ license.workspace = true
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
flate2.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use utils::lsn::Lsn;
|
|||||||
|
|
||||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||||
|
use utils::measured_stream::MeasuredReader;
|
||||||
|
|
||||||
use crate::config;
|
use crate::config;
|
||||||
use crate::pg_helpers::*;
|
use crate::pg_helpers::*;
|
||||||
@@ -179,16 +180,21 @@ impl ComputeNode {
|
|||||||
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
|
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
|
||||||
};
|
};
|
||||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||||
|
let mut measured_reader = MeasuredReader::new(copyreader);
|
||||||
|
let mut decoder = flate2::read::GzDecoder::new(&mut measured_reader);
|
||||||
|
|
||||||
// Read the archive directly from the `CopyOutReader`
|
// Read the archive directly from the `CopyOutReader`
|
||||||
//
|
//
|
||||||
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
||||||
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
||||||
// sends an Error after finishing the tarball, we will not notice it.
|
// sends an Error after finishing the tarball, we will not notice it.
|
||||||
let mut ar = tar::Archive::new(copyreader);
|
let mut ar = tar::Archive::new(&mut decoder);
|
||||||
ar.set_ignore_zeros(true);
|
ar.set_ignore_zeros(true);
|
||||||
ar.unpack(&self.pgdata)?;
|
ar.unpack(&self.pgdata)?;
|
||||||
|
|
||||||
|
// Report metrics
|
||||||
|
self.state.lock().unwrap().metrics.basebackup_bytes =
|
||||||
|
measured_reader.get_byte_count() as u64;
|
||||||
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
|
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
|
||||||
.signed_duration_since(start_time)
|
.signed_duration_since(start_time)
|
||||||
.to_std()
|
.to_std()
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ pub struct ComputeMetrics {
|
|||||||
pub wait_for_spec_ms: u64,
|
pub wait_for_spec_ms: u64,
|
||||||
pub sync_safekeepers_ms: u64,
|
pub sync_safekeepers_ms: u64,
|
||||||
pub basebackup_ms: u64,
|
pub basebackup_ms: u64,
|
||||||
|
pub basebackup_bytes: u64,
|
||||||
pub config_ms: u64,
|
pub config_ms: u64,
|
||||||
pub total_startup_ms: u64,
|
pub total_startup_ms: u64,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
use std::io::Read;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::{io, task};
|
use std::{io, task};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
@@ -75,3 +76,34 @@ impl<S: AsyncWrite + Unpin, R, W: FnMut(usize)> AsyncWrite for MeasuredStream<S,
|
|||||||
self.project().stream.poll_shutdown(context)
|
self.project().stream.poll_shutdown(context)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper for a reader that counts bytes read.
|
||||||
|
///
|
||||||
|
/// Similar to MeasuredStream but it's one way and it's sync
|
||||||
|
pub struct MeasuredReader<R: Read> {
|
||||||
|
inner: R,
|
||||||
|
byte_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> MeasuredReader<R> {
|
||||||
|
pub fn new(reader: R) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: reader,
|
||||||
|
byte_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_byte_count(&self) -> usize {
|
||||||
|
self.byte_count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> Read for MeasuredReader<R> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
let result = self.inner.read(buf);
|
||||||
|
if let Ok(n_bytes) = result {
|
||||||
|
self.byte_count += n_bytes
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
flate2.workspace = true
|
||||||
async-stream.workspace = true
|
async-stream.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
byteorder.workspace = true
|
byteorder.workspace = true
|
||||||
|
|||||||
@@ -24,14 +24,14 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
|||||||
use pq_proto::framed::ConnectionError;
|
use pq_proto::framed::ConnectionError;
|
||||||
use pq_proto::FeStartupPacket;
|
use pq_proto::FeStartupPacket;
|
||||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||||
use std::io;
|
use std::io::{self, Write};
|
||||||
use std::net::TcpListener;
|
use std::net::TcpListener;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio_util::io::StreamReader;
|
use tokio_util::io::StreamReader;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::id::ConnectionId;
|
use utils::id::ConnectionId;
|
||||||
@@ -773,8 +773,9 @@ impl PageServerHandler {
|
|||||||
// Send a tarball of the latest layer on the timeline
|
// Send a tarball of the latest layer on the timeline
|
||||||
{
|
{
|
||||||
let mut writer = pgb.copyout_writer();
|
let mut writer = pgb.copyout_writer();
|
||||||
|
let mut raw_tar = Vec::new();
|
||||||
basebackup::send_basebackup_tarball(
|
basebackup::send_basebackup_tarball(
|
||||||
&mut writer,
|
&mut raw_tar,
|
||||||
&timeline,
|
&timeline,
|
||||||
lsn,
|
lsn,
|
||||||
prev_lsn,
|
prev_lsn,
|
||||||
@@ -782,6 +783,11 @@ impl PageServerHandler {
|
|||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
let mut encoder =
|
||||||
|
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
|
||||||
|
encoder.write_all(&raw_tar)?;
|
||||||
|
let compressed_tar = encoder.finish()?;
|
||||||
|
writer.write(&compressed_tar).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||||
|
|||||||
@@ -4,6 +4,49 @@ import pytest
|
|||||||
import requests
|
import requests
|
||||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
from fixtures.utils import get_dir_size
|
||||||
|
|
||||||
|
|
||||||
|
# @pytest.mark.xfail # We currently pass a 16MB pg_wal dir instead of creating it client-side
|
||||||
|
def test_basebackup_size(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||||
|
neon_env_builder.num_safekeepers = 3
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
# Start
|
||||||
|
env.neon_cli.create_branch("test_startup")
|
||||||
|
endpoint = env.endpoints.create_start("test_startup")
|
||||||
|
|
||||||
|
# Get metrics
|
||||||
|
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||||
|
basebackup_bytes = metrics["basebackup_bytes"]
|
||||||
|
zenbenchmark.record(
|
||||||
|
"basebackup_size", basebackup_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stop so we force flush of any files and we can measure datadir sizes
|
||||||
|
# NOTE the order of this line is important in relation to get_dir_size
|
||||||
|
datadir = endpoint.pgdata_dir
|
||||||
|
assert datadir is not None # for mypy
|
||||||
|
endpoint.stop()
|
||||||
|
|
||||||
|
# Even though we don't insert any data, this nuber could be larger than basebackup
|
||||||
|
# size because there could theoretically be compression, or postgres could create
|
||||||
|
# or download data during startup. Currently if we don't send any pg_wal in the
|
||||||
|
# basebackup, postgres will start up just fine, but during sync-safekeepers,
|
||||||
|
# walproposer will try to recover the missing wal from safekeepers and cause the
|
||||||
|
# same amount of network IO. We want to notice that if it happens.
|
||||||
|
datadir_bytes = get_dir_size(datadir)
|
||||||
|
zenbenchmark.record(
|
||||||
|
"datadir_size", datadir_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER
|
||||||
|
)
|
||||||
|
|
||||||
|
wal_bytes = get_dir_size(datadir + "/pg_wal")
|
||||||
|
zenbenchmark.record("wal_size", wal_bytes / 1024, "KB", report=MetricReport.LOWER_IS_BETTER)
|
||||||
|
|
||||||
|
# Seems like a reasonable limit, but increase it if it becomes impossible to meet
|
||||||
|
# assert basebackup_bytes < 70 * 1024
|
||||||
|
# assert datadir_bytes < 70 * 1024
|
||||||
|
# assert wal_bytes < 1 * 1024
|
||||||
|
|
||||||
|
|
||||||
# Just start and measure duration.
|
# Just start and measure duration.
|
||||||
|
|||||||
Reference in New Issue
Block a user