mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
Add timeline physical size tracking (#2126)
Ref #1902. - Track the layered timeline's `physical_size` using `pageserver_current_physical_size` metric when updating the layer map. - Report the local timeline's `physical_size` in timeline GET APIs. - Add `include-non-incremental-physical-size` URL flag to also report the local timeline's `physical_size_non_incremental` (similar to `logical_size_non_incremental`) - Add a `UIntGaugeVec` and `UIntGauge` to represent `u64` prometheus metrics Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
This commit is contained in:
@@ -3,6 +3,9 @@
|
||||
//! Otherwise, we might not see all metrics registered via
|
||||
//! a default registry.
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::{core, default_registry, proto};
|
||||
pub use prometheus::{exponential_buckets, linear_buckets};
|
||||
pub use prometheus::{register_gauge, Gauge};
|
||||
@@ -18,6 +21,17 @@ pub use prometheus::{Encoder, TextEncoder};
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
|
||||
pub type UIntGauge = GenericGauge<AtomicU64>;
|
||||
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! register_uint_gauge_vec {
|
||||
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
|
||||
let gauge_vec = UIntGaugeVec::new($crate::opts!($NAME, $HELP), $LABELS_NAMES).unwrap();
|
||||
$crate::register(Box::new(gauge_vec.clone())).map(|_| gauge_vec)
|
||||
}};
|
||||
}
|
||||
|
||||
/// Gathers all Prometheus metrics and records the I/O stats just before that.
|
||||
///
|
||||
/// Metrics gathering is a relatively simple and standalone operation, so
|
||||
|
||||
@@ -78,6 +78,11 @@ paths:
|
||||
schema:
|
||||
type: string
|
||||
description: Controls calculation of current_logical_size_non_incremental
|
||||
- name: include-non-incremental-physical-size
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: Controls calculation of current_physical_size_non_incremental
|
||||
get:
|
||||
description: Get timelines for tenant
|
||||
responses:
|
||||
@@ -136,6 +141,11 @@ paths:
|
||||
schema:
|
||||
type: string
|
||||
description: Controls calculation of current_logical_size_non_incremental
|
||||
- name: include-non-incremental-physical-size
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: Controls calculation of current_physical_size_non_incremental
|
||||
responses:
|
||||
"200":
|
||||
description: TimelineInfo
|
||||
@@ -671,8 +681,12 @@ components:
|
||||
format: hex
|
||||
current_logical_size:
|
||||
type: integer
|
||||
current_physical_size:
|
||||
type: integer
|
||||
current_logical_size_non_incremental:
|
||||
type: integer
|
||||
current_physical_size_non_incremental:
|
||||
type: integer
|
||||
|
||||
WalReceiverEntry:
|
||||
type: object
|
||||
|
||||
@@ -113,10 +113,17 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
|
||||
let include_non_incremental_logical_size =
|
||||
query_param_present(&request, "include-non-incremental-logical-size");
|
||||
let include_non_incremental_physical_size =
|
||||
query_param_present(&request, "include-non-incremental-physical-size");
|
||||
let local_timeline_infos = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
|
||||
crate::timelines::get_local_timelines(tenant_id, include_non_incremental_logical_size)
|
||||
crate::timelines::get_local_timelines(
|
||||
tenant_id,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
@@ -145,17 +152,15 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
|
||||
// Gate non incremental logical size calculation behind a flag
|
||||
// after pgbench -i -s100 calculation took 28ms so if multiplied by the number of timelines
|
||||
// and tenants it can take noticeable amount of time. Also the value currently used only in tests
|
||||
fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
|
||||
/// Checks if a query param is present in the request's URL
|
||||
fn query_param_present(request: &Request<Body>, param: &str) -> bool {
|
||||
request
|
||||
.uri()
|
||||
.query()
|
||||
.map(|v| {
|
||||
url::form_urlencoded::parse(v.as_bytes())
|
||||
.into_owned()
|
||||
.any(|(param, _)| param == "include-non-incremental-logical-size")
|
||||
.any(|(p, _)| p == param)
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
@@ -165,7 +170,10 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
|
||||
let include_non_incremental_logical_size =
|
||||
query_param_present(&request, "include-non-incremental-logical-size");
|
||||
let include_non_incremental_physical_size =
|
||||
query_param_present(&request, "include-non-incremental-physical-size");
|
||||
|
||||
let (local_timeline_info, remote_timeline_info) = async {
|
||||
// any error here will render local timeline as None
|
||||
@@ -181,6 +189,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
timeline_id,
|
||||
timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
})
|
||||
.transpose()?
|
||||
|
||||
@@ -20,7 +20,8 @@ use std::time::{Duration, SystemTime};
|
||||
|
||||
use metrics::{
|
||||
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
|
||||
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
|
||||
register_uint_gauge_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
|
||||
IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
|
||||
use crate::layered_repository::{
|
||||
@@ -101,6 +102,18 @@ lazy_static! {
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
// Metrics for determining timeline's physical size.
|
||||
// A layered timeline's physical is defined as the total size of
|
||||
// (delta/image) layer files on disk.
|
||||
lazy_static! {
|
||||
static ref CURRENT_PHYSICAL_SIZE: UIntGaugeVec = register_uint_gauge_vec!(
|
||||
"pageserver_current_physical_size",
|
||||
"Current physical size grouped by timeline",
|
||||
&["tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
|
||||
// or in testing they estimate how much we would upload if we did.
|
||||
lazy_static! {
|
||||
@@ -233,6 +246,7 @@ pub struct LayeredTimeline {
|
||||
create_images_time_histo: Histogram,
|
||||
last_record_gauge: IntGauge,
|
||||
wait_lsn_time_histo: Histogram,
|
||||
current_physical_size_gauge: UIntGauge,
|
||||
|
||||
/// If `true`, will backup its files that appear after each checkpointing to the remote storage.
|
||||
upload_layers: AtomicBool,
|
||||
@@ -437,6 +451,30 @@ impl Timeline for LayeredTimeline {
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
})
|
||||
}
|
||||
|
||||
fn get_physical_size(&self) -> u64 {
|
||||
self.current_physical_size_gauge.get()
|
||||
}
|
||||
|
||||
fn get_physical_size_non_incremental(&self) -> anyhow::Result<u64> {
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
// total size of layer files in the current timeline directory
|
||||
let mut total_physical_size = 0;
|
||||
|
||||
for direntry in fs::read_dir(timeline_path)? {
|
||||
let direntry = direntry?;
|
||||
let fname = direntry.file_name();
|
||||
let fname = fname.to_string_lossy();
|
||||
|
||||
if ImageFileName::parse_str(&fname).is_some()
|
||||
|| DeltaFileName::parse_str(&fname).is_some()
|
||||
{
|
||||
total_physical_size += direntry.metadata()?.len();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(total_physical_size)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayeredTimeline {
|
||||
@@ -515,6 +553,9 @@ impl LayeredTimeline {
|
||||
let wait_lsn_time_histo = WAIT_LSN_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
|
||||
.unwrap();
|
||||
let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
|
||||
.unwrap();
|
||||
|
||||
let mut result = LayeredTimeline {
|
||||
conf,
|
||||
@@ -544,6 +585,7 @@ impl LayeredTimeline {
|
||||
create_images_time_histo,
|
||||
last_record_gauge,
|
||||
wait_lsn_time_histo,
|
||||
current_physical_size_gauge,
|
||||
|
||||
upload_layers: AtomicBool::new(upload_layers),
|
||||
|
||||
@@ -579,6 +621,8 @@ impl LayeredTimeline {
|
||||
// Scan timeline directory and create ImageFileName and DeltaFilename
|
||||
// structs representing all files on disk
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
// total size of layer files in the current timeline directory
|
||||
let mut total_physical_size = 0;
|
||||
|
||||
for direntry in fs::read_dir(timeline_path)? {
|
||||
let direntry = direntry?;
|
||||
@@ -601,6 +645,7 @@ impl LayeredTimeline {
|
||||
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename);
|
||||
|
||||
trace!("found layer {}", layer.filename().display());
|
||||
total_physical_size += layer.path().metadata()?.len();
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
@@ -624,6 +669,7 @@ impl LayeredTimeline {
|
||||
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename);
|
||||
|
||||
trace!("found layer {}", layer.filename().display());
|
||||
total_physical_size += layer.path().metadata()?.len();
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
@@ -640,9 +686,10 @@ impl LayeredTimeline {
|
||||
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
info!(
|
||||
"loaded layer map with {} layers at {}",
|
||||
num_layers, disk_consistent_lsn
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
num_layers, disk_consistent_lsn, total_physical_size
|
||||
);
|
||||
self.current_physical_size_gauge.set(total_physical_size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1203,8 +1250,12 @@ impl LayeredTimeline {
|
||||
layers.insert_historic(Arc::new(new_delta));
|
||||
}
|
||||
|
||||
// update the timeline's physical size
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
self.current_physical_size_gauge.add(sz);
|
||||
// update metrics
|
||||
NUM_PERSISTENT_FILES_CREATED.inc_by(1);
|
||||
PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len());
|
||||
PERSISTENT_BYTES_WRITTEN.inc_by(sz);
|
||||
|
||||
Ok(new_delta_path)
|
||||
}
|
||||
@@ -1390,6 +1441,8 @@ impl LayeredTimeline {
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
for l in image_layers {
|
||||
self.current_physical_size_gauge
|
||||
.add(l.path().metadata()?.len());
|
||||
layers.insert_historic(Arc::new(l));
|
||||
}
|
||||
drop(layers);
|
||||
@@ -1635,19 +1688,27 @@ impl LayeredTimeline {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut new_layer_paths = HashSet::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
new_layer_paths.insert(l.path());
|
||||
let new_delta_path = l.path();
|
||||
|
||||
// update the timeline's physical size
|
||||
self.current_physical_size_gauge
|
||||
.add(new_delta_path.metadata()?.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path);
|
||||
layers.insert_historic(Arc::new(l));
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
let mut layer_paths_do_delete = HashSet::with_capacity(deltas_to_compact.len());
|
||||
for l in &deltas_to_compact {
|
||||
l.delete()?;
|
||||
drop(all_keys_iter);
|
||||
for l in deltas_to_compact {
|
||||
if let Some(path) = l.local_path() {
|
||||
self.current_physical_size_gauge.sub(path.metadata()?.len());
|
||||
layer_paths_do_delete.insert(path);
|
||||
}
|
||||
layers.remove_historic(l.clone());
|
||||
l.delete()?;
|
||||
layers.remove_historic(l);
|
||||
}
|
||||
drop(layers);
|
||||
|
||||
@@ -1899,10 +1960,11 @@ impl LayeredTimeline {
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len());
|
||||
for doomed_layer in layers_to_remove {
|
||||
doomed_layer.delete()?;
|
||||
if let Some(path) = doomed_layer.local_path() {
|
||||
self.current_physical_size_gauge.sub(path.metadata()?.len());
|
||||
layer_paths_to_delete.insert(path);
|
||||
}
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer);
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
|
||||
@@ -382,6 +382,11 @@ pub trait Timeline: Send + Sync {
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Get the physical size of the timeline at the latest LSN
|
||||
fn get_physical_size(&self) -> u64;
|
||||
/// Get the physical size of the timeline at the latest LSN non incrementally
|
||||
fn get_physical_size_non_incremental(&self) -> Result<u64>;
|
||||
}
|
||||
|
||||
/// Various functions to mutate the timeline.
|
||||
|
||||
@@ -49,7 +49,9 @@ pub struct LocalTimelineInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
pub current_logical_size_non_incremental: Option<usize>,
|
||||
pub current_physical_size_non_incremental: Option<u64>,
|
||||
pub timeline_state: LocalTimelineState,
|
||||
}
|
||||
|
||||
@@ -57,6 +59,7 @@ impl LocalTimelineInfo {
|
||||
pub fn from_loaded_timeline(
|
||||
timeline: &TimelineImpl,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let info = LocalTimelineInfo {
|
||||
@@ -72,12 +75,18 @@ impl LocalTimelineInfo {
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Loaded,
|
||||
current_physical_size: Some(timeline.get_physical_size()),
|
||||
current_logical_size: Some(timeline.get_current_logical_size()),
|
||||
current_logical_size_non_incremental: if include_non_incremental_logical_size {
|
||||
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
current_physical_size_non_incremental: if include_non_incremental_physical_size {
|
||||
Some(timeline.get_physical_size_non_incremental()?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
@@ -97,7 +106,9 @@ impl LocalTimelineInfo {
|
||||
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Unloaded,
|
||||
current_logical_size: None,
|
||||
current_physical_size: None,
|
||||
current_logical_size_non_incremental: None,
|
||||
current_physical_size_non_incremental: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,11 +117,16 @@ impl LocalTimelineInfo {
|
||||
timeline_id: ZTimelineId,
|
||||
repo_timeline: &RepositoryTimeline<T>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
match repo_timeline {
|
||||
RepositoryTimeline::Loaded(_) => {
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
|
||||
Self::from_loaded_timeline(&*timeline, include_non_incremental_logical_size)
|
||||
Self::from_loaded_timeline(
|
||||
&*timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
}
|
||||
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
|
||||
}
|
||||
@@ -320,6 +336,7 @@ fn bootstrap_timeline<R: Repository>(
|
||||
pub(crate) fn get_local_timelines(
|
||||
tenant_id: ZTenantId,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
|
||||
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
|
||||
@@ -334,6 +351,7 @@ pub(crate) fn get_local_timelines(
|
||||
timeline_id,
|
||||
&repository_timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)?,
|
||||
))
|
||||
}
|
||||
@@ -387,7 +405,7 @@ pub(crate) fn create_timeline(
|
||||
// load the timeline into memory
|
||||
let loaded_timeline =
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false)
|
||||
LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false, false)
|
||||
.context("cannot fill timeline info")?
|
||||
}
|
||||
None => {
|
||||
@@ -395,7 +413,7 @@ pub(crate) fn create_timeline(
|
||||
// load the timeline into memory
|
||||
let new_timeline =
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false)
|
||||
LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false, false)
|
||||
.context("cannot fill timeline info")?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
from contextlib import closing
|
||||
import pathlib
|
||||
from uuid import UUID
|
||||
import re
|
||||
import psycopg2.extras
|
||||
import psycopg2.errors
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local
|
||||
from fixtures.log_helper import log
|
||||
import time
|
||||
|
||||
from fixtures.utils import get_timeline_dir_size
|
||||
|
||||
|
||||
def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
@@ -176,3 +181,129 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
|
||||
pg_cluster_size = cur.fetchone()
|
||||
log.info(f"pg_cluster_size = {pg_cluster_size}")
|
||||
|
||||
|
||||
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_init')
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_init")
|
||||
|
||||
pg.safe_psql_many([
|
||||
"CREATE TABLE foo (t text)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 1000) g""",
|
||||
])
|
||||
|
||||
# restart the pageserer to force calculating timeline's initial physical size
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_checkpoint')
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
|
||||
|
||||
pg.safe_psql_many([
|
||||
"CREATE TABLE foo (t text)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 1000) g""",
|
||||
])
|
||||
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
|
||||
# Disable background compaction as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m'}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_compaction')
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
|
||||
|
||||
pg.safe_psql_many([
|
||||
"CREATE TABLE foo (t text)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
neon_env_builder.pageserver_config_override = \
|
||||
"tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_gc')
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
|
||||
|
||||
pg.safe_psql_many([
|
||||
"CREATE TABLE foo (t text)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
pg.safe_psql("""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
""")
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0")
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
def test_timeline_physical_size_metric(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_metric')
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_metric")
|
||||
|
||||
pg.safe_psql_many([
|
||||
"CREATE TABLE foo (t text)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
# get the metrics and parse the metric for the current timeline's physical size
|
||||
metrics = env.pageserver.http_client().get_metrics()
|
||||
matches = re.search(
|
||||
f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE)
|
||||
assert matches
|
||||
|
||||
# assert that the metric matches the actual physical size on disk
|
||||
tl_physical_size_metric = int(matches.group(1))
|
||||
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
|
||||
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
|
||||
|
||||
|
||||
def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID):
|
||||
"""Check the current physical size returned from timeline API
|
||||
matches the total physical size of the timeline on disk"""
|
||||
client = env.pageserver.http_client()
|
||||
res = assert_timeline_local(client, tenant_id, timeline_id)
|
||||
timeline_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
assert res["local"]["current_physical_size"] == res["local"][
|
||||
"current_physical_size_non_incremental"]
|
||||
assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path)
|
||||
|
||||
@@ -691,6 +691,10 @@ class NeonEnv:
|
||||
""" Get list of safekeeper endpoints suitable for safekeepers GUC """
|
||||
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
||||
|
||||
def timeline_dir(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
return self.repo_dir / "tenants" / tenant_id.hex / "timelines" / timeline_id.hex
|
||||
|
||||
@cached_property
|
||||
def auth_keys(self) -> AuthKeys:
|
||||
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
|
||||
@@ -863,8 +867,8 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
|
||||
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
|
||||
)
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" +
|
||||
"?include-non-incremental-logical-size=1&include-non-incremental-physical-size=1")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import contextlib
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from typing import Any, List
|
||||
from typing import Any, List, Tuple
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
@@ -89,3 +91,36 @@ def get_dir_size(path: str) -> int:
|
||||
pass # file could be concurrently removed
|
||||
|
||||
return totalbytes
|
||||
|
||||
|
||||
def get_timeline_dir_size(path: pathlib.Path) -> int:
|
||||
"""Get the timeline directory's total size, which only counts the layer files' size."""
|
||||
sz = 0
|
||||
for dir_entry in path.iterdir():
|
||||
with contextlib.suppress(Exception):
|
||||
# file is an image layer
|
||||
_ = parse_image_layer(dir_entry.name)
|
||||
sz += dir_entry.stat().st_size
|
||||
continue
|
||||
|
||||
with contextlib.suppress(Exception):
|
||||
# file is a delta layer
|
||||
_ = parse_delta_layer(dir_entry.name)
|
||||
sz += dir_entry.stat().st_size
|
||||
continue
|
||||
return sz
|
||||
|
||||
|
||||
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
|
||||
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
|
||||
parts = f_name.split("__")
|
||||
key_parts = parts[0].split("-")
|
||||
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
|
||||
|
||||
|
||||
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
|
||||
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
|
||||
parts = f_name.split("__")
|
||||
key_parts = parts[0].split("-")
|
||||
lsn_parts = parts[1].split("-")
|
||||
return int(key_parts[0], 16), int(key_parts[1], 16), int(lsn_parts[0], 16), int(lsn_parts[1], 16)
|
||||
|
||||
Reference in New Issue
Block a user