diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 3b5da9f7ff..ea24b3fe7e 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -3,6 +3,9 @@ //! Otherwise, we might not see all metrics registered via //! a default registry. use lazy_static::lazy_static; +use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec}; +pub use prometheus::opts; +pub use prometheus::register; pub use prometheus::{core, default_registry, proto}; pub use prometheus::{exponential_buckets, linear_buckets}; pub use prometheus::{register_gauge, Gauge}; @@ -18,6 +21,17 @@ pub use prometheus::{Encoder, TextEncoder}; mod wrappers; pub use wrappers::{CountedReader, CountedWriter}; +pub type UIntGauge = GenericGauge; +pub type UIntGaugeVec = GenericGaugeVec; + +#[macro_export] +macro_rules! register_uint_gauge_vec { + ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ + let gauge_vec = UIntGaugeVec::new($crate::opts!($NAME, $HELP), $LABELS_NAMES).unwrap(); + $crate::register(Box::new(gauge_vec.clone())).map(|_| gauge_vec) + }}; +} + /// Gathers all Prometheus metrics and records the I/O stats just before that. /// /// Metrics gathering is a relatively simple and standalone operation, so diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 2775a27e0f..46305a4855 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -78,6 +78,11 @@ paths: schema: type: string description: Controls calculation of current_logical_size_non_incremental + - name: include-non-incremental-physical-size + in: query + schema: + type: string + description: Controls calculation of current_physical_size_non_incremental get: description: Get timelines for tenant responses: @@ -136,6 +141,11 @@ paths: schema: type: string description: Controls calculation of current_logical_size_non_incremental + - name: include-non-incremental-physical-size + in: query + schema: + type: string + description: Controls calculation of current_physical_size_non_incremental responses: "200": description: TimelineInfo @@ -671,8 +681,12 @@ components: format: hex current_logical_size: type: integer + current_physical_size: + type: integer current_logical_size_non_incremental: type: integer + current_physical_size_non_incremental: + type: integer WalReceiverEntry: type: object diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 236415cf58..8ac3faca7a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -113,10 +113,17 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); + let include_non_incremental_logical_size = + query_param_present(&request, "include-non-incremental-logical-size"); + let include_non_incremental_physical_size = + query_param_present(&request, "include-non-incremental-physical-size"); let local_timeline_infos = tokio::task::spawn_blocking(move || { let _enter = info_span!("timeline_list", tenant = %tenant_id).entered(); - crate::timelines::get_local_timelines(tenant_id, include_non_incremental_logical_size) + crate::timelines::get_local_timelines( + tenant_id, + include_non_incremental_logical_size, + include_non_incremental_physical_size, + ) }) .await .map_err(ApiError::from_err)??; @@ -145,17 +152,15 @@ async fn timeline_list_handler(request: Request) -> Result, json_response(StatusCode::OK, response_data) } -// Gate non incremental logical size calculation behind a flag -// after pgbench -i -s100 calculation took 28ms so if multiplied by the number of timelines -// and tenants it can take noticeable amount of time. Also the value currently used only in tests -fn get_include_non_incremental_logical_size(request: &Request) -> bool { +/// Checks if a query param is present in the request's URL +fn query_param_present(request: &Request, param: &str) -> bool { request .uri() .query() .map(|v| { url::form_urlencoded::parse(v.as_bytes()) .into_owned() - .any(|(param, _)| param == "include-non-incremental-logical-size") + .any(|(p, _)| p == param) }) .unwrap_or(false) } @@ -165,7 +170,10 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result u64 { + self.current_physical_size_gauge.get() + } + + fn get_physical_size_non_incremental(&self) -> anyhow::Result { + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + // total size of layer files in the current timeline directory + let mut total_physical_size = 0; + + for direntry in fs::read_dir(timeline_path)? { + let direntry = direntry?; + let fname = direntry.file_name(); + let fname = fname.to_string_lossy(); + + if ImageFileName::parse_str(&fname).is_some() + || DeltaFileName::parse_str(&fname).is_some() + { + total_physical_size += direntry.metadata()?.len(); + } + } + + Ok(total_physical_size) + } } impl LayeredTimeline { @@ -515,6 +553,9 @@ impl LayeredTimeline { let wait_lsn_time_histo = WAIT_LSN_TIME .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); + let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE + .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) + .unwrap(); let mut result = LayeredTimeline { conf, @@ -544,6 +585,7 @@ impl LayeredTimeline { create_images_time_histo, last_record_gauge, wait_lsn_time_histo, + current_physical_size_gauge, upload_layers: AtomicBool::new(upload_layers), @@ -579,6 +621,8 @@ impl LayeredTimeline { // Scan timeline directory and create ImageFileName and DeltaFilename // structs representing all files on disk let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + // total size of layer files in the current timeline directory + let mut total_physical_size = 0; for direntry in fs::read_dir(timeline_path)? { let direntry = direntry?; @@ -601,6 +645,7 @@ impl LayeredTimeline { ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename); trace!("found layer {}", layer.filename().display()); + total_physical_size += layer.path().metadata()?.len(); layers.insert_historic(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { @@ -624,6 +669,7 @@ impl LayeredTimeline { DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename); trace!("found layer {}", layer.filename().display()); + total_physical_size += layer.path().metadata()?.len(); layers.insert_historic(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { @@ -640,9 +686,10 @@ impl LayeredTimeline { layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1); info!( - "loaded layer map with {} layers at {}", - num_layers, disk_consistent_lsn + "loaded layer map with {} layers at {}, total physical size: {}", + num_layers, disk_consistent_lsn, total_physical_size ); + self.current_physical_size_gauge.set(total_physical_size); Ok(()) } @@ -1203,8 +1250,12 @@ impl LayeredTimeline { layers.insert_historic(Arc::new(new_delta)); } + // update the timeline's physical size + let sz = new_delta_path.metadata()?.len(); + self.current_physical_size_gauge.add(sz); + // update metrics NUM_PERSISTENT_FILES_CREATED.inc_by(1); - PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len()); + PERSISTENT_BYTES_WRITTEN.inc_by(sz); Ok(new_delta_path) } @@ -1390,6 +1441,8 @@ impl LayeredTimeline { let mut layers = self.layers.write().unwrap(); for l in image_layers { + self.current_physical_size_gauge + .add(l.path().metadata()?.len()); layers.insert_historic(Arc::new(l)); } drop(layers); @@ -1635,19 +1688,27 @@ impl LayeredTimeline { let mut layers = self.layers.write().unwrap(); let mut new_layer_paths = HashSet::with_capacity(new_layers.len()); for l in new_layers { - new_layer_paths.insert(l.path()); + let new_delta_path = l.path(); + + // update the timeline's physical size + self.current_physical_size_gauge + .add(new_delta_path.metadata()?.len()); + + new_layer_paths.insert(new_delta_path); layers.insert_historic(Arc::new(l)); } // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones let mut layer_paths_do_delete = HashSet::with_capacity(deltas_to_compact.len()); - for l in &deltas_to_compact { - l.delete()?; + drop(all_keys_iter); + for l in deltas_to_compact { if let Some(path) = l.local_path() { + self.current_physical_size_gauge.sub(path.metadata()?.len()); layer_paths_do_delete.insert(path); } - layers.remove_historic(l.clone()); + l.delete()?; + layers.remove_historic(l); } drop(layers); @@ -1899,10 +1960,11 @@ impl LayeredTimeline { // while iterating it. BTreeMap::retain() would be another option) let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len()); for doomed_layer in layers_to_remove { - doomed_layer.delete()?; if let Some(path) = doomed_layer.local_path() { + self.current_physical_size_gauge.sub(path.metadata()?.len()); layer_paths_to_delete.insert(path); } + doomed_layer.delete()?; layers.remove_historic(doomed_layer); result.layers_removed += 1; } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 61058a7806..0ca8c6150c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -382,6 +382,11 @@ pub trait Timeline: Send + Sync { lsn: Lsn, latest_gc_cutoff_lsn: &RwLockReadGuard, ) -> Result<()>; + + /// Get the physical size of the timeline at the latest LSN + fn get_physical_size(&self) -> u64; + /// Get the physical size of the timeline at the latest LSN non incrementally + fn get_physical_size_non_incremental(&self) -> Result; } /// Various functions to mutate the timeline. diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 984276bad2..1088e516aa 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -49,7 +49,9 @@ pub struct LocalTimelineInfo { #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, pub current_logical_size: Option, // is None when timeline is Unloaded + pub current_physical_size: Option, // is None when timeline is Unloaded pub current_logical_size_non_incremental: Option, + pub current_physical_size_non_incremental: Option, pub timeline_state: LocalTimelineState, } @@ -57,6 +59,7 @@ impl LocalTimelineInfo { pub fn from_loaded_timeline( timeline: &TimelineImpl, include_non_incremental_logical_size: bool, + include_non_incremental_physical_size: bool, ) -> anyhow::Result { let last_record_lsn = timeline.get_last_record_lsn(); let info = LocalTimelineInfo { @@ -72,12 +75,18 @@ impl LocalTimelineInfo { prev_record_lsn: Some(timeline.get_prev_record_lsn()), latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Loaded, + current_physical_size: Some(timeline.get_physical_size()), current_logical_size: Some(timeline.get_current_logical_size()), current_logical_size_non_incremental: if include_non_incremental_logical_size { Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?) } else { None }, + current_physical_size_non_incremental: if include_non_incremental_physical_size { + Some(timeline.get_physical_size_non_incremental()?) + } else { + None + }, }; Ok(info) } @@ -97,7 +106,9 @@ impl LocalTimelineInfo { latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Unloaded, current_logical_size: None, + current_physical_size: None, current_logical_size_non_incremental: None, + current_physical_size_non_incremental: None, } } @@ -106,11 +117,16 @@ impl LocalTimelineInfo { timeline_id: ZTimelineId, repo_timeline: &RepositoryTimeline, include_non_incremental_logical_size: bool, + include_non_incremental_physical_size: bool, ) -> anyhow::Result { match repo_timeline { RepositoryTimeline::Loaded(_) => { let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?; - Self::from_loaded_timeline(&*timeline, include_non_incremental_logical_size) + Self::from_loaded_timeline( + &*timeline, + include_non_incremental_logical_size, + include_non_incremental_physical_size, + ) } RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)), } @@ -320,6 +336,7 @@ fn bootstrap_timeline( pub(crate) fn get_local_timelines( tenant_id: ZTenantId, include_non_incremental_logical_size: bool, + include_non_incremental_physical_size: bool, ) -> Result> { let repo = tenant_mgr::get_repository_for_tenant(tenant_id) .with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?; @@ -334,6 +351,7 @@ pub(crate) fn get_local_timelines( timeline_id, &repository_timeline, include_non_incremental_logical_size, + include_non_incremental_physical_size, )?, )) } @@ -387,7 +405,7 @@ pub(crate) fn create_timeline( // load the timeline into memory let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; - LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false) + LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false, false) .context("cannot fill timeline info")? } None => { @@ -395,7 +413,7 @@ pub(crate) fn create_timeline( // load the timeline into memory let new_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; - LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false) + LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false, false) .context("cannot fill timeline info")? } }; diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index 7b7b16bcbf..c3788a0e9b 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -1,10 +1,15 @@ from contextlib import closing +import pathlib +from uuid import UUID +import re import psycopg2.extras import psycopg2.errors from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local from fixtures.log_helper import log import time +from fixtures.utils import get_timeline_dir_size + def test_timeline_size(neon_simple_env: NeonEnv): env = neon_simple_env @@ -176,3 +181,129 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder): cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())") pg_cluster_size = cur.fetchone() log.info(f"pg_cluster_size = {pg_cluster_size}") + + +def test_timeline_physical_size_init(neon_simple_env: NeonEnv): + env = neon_simple_env + new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_init') + pg = env.postgres.create_start("test_timeline_physical_size_init") + + pg.safe_psql_many([ + "CREATE TABLE foo (t text)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 1000) g""", + ]) + + # restart the pageserer to force calculating timeline's initial physical size + env.pageserver.stop() + env.pageserver.start() + + assert_physical_size(env, env.initial_tenant, new_timeline_id) + + +def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv): + env = neon_simple_env + new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_checkpoint') + pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint") + + pg.safe_psql_many([ + "CREATE TABLE foo (t text)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 1000) g""", + ]) + + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + assert_physical_size(env, env.initial_tenant, new_timeline_id) + + +def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder): + # Disable background compaction as we don't want it to happen after `get_physical_size` request + # and before checking the expected size on disk, which makes the assertion failed + neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m'}" + + env = neon_env_builder.init_start() + + new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_compaction') + pg = env.postgres.create_start("test_timeline_physical_size_post_compaction") + + pg.safe_psql_many([ + "CREATE TABLE foo (t text)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g""", + ]) + + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}") + assert_physical_size(env, env.initial_tenant, new_timeline_id) + + +def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder): + # Disable background compaction and GC as we don't want it to happen after `get_physical_size` request + # and before checking the expected size on disk, which makes the assertion failed + neon_env_builder.pageserver_config_override = \ + "tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}" + + env = neon_env_builder.init_start() + + new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_gc') + pg = env.postgres.create_start("test_timeline_physical_size_post_gc") + + pg.safe_psql_many([ + "CREATE TABLE foo (t text)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g""", + ]) + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + pg.safe_psql(""" + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """) + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + + env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0") + assert_physical_size(env, env.initial_tenant, new_timeline_id) + + +def test_timeline_physical_size_metric(neon_simple_env: NeonEnv): + env = neon_simple_env + + new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_metric') + pg = env.postgres.create_start("test_timeline_physical_size_metric") + + pg.safe_psql_many([ + "CREATE TABLE foo (t text)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g""", + ]) + + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + + # get the metrics and parse the metric for the current timeline's physical size + metrics = env.pageserver.http_client().get_metrics() + matches = re.search( + f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$', + metrics, + re.MULTILINE) + assert matches + + # assert that the metric matches the actual physical size on disk + tl_physical_size_metric = int(matches.group(1)) + timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id) + assert tl_physical_size_metric == get_timeline_dir_size(timeline_path) + + +def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID): + """Check the current physical size returned from timeline API + matches the total physical size of the timeline on disk""" + client = env.pageserver.http_client() + res = assert_timeline_local(client, tenant_id, timeline_id) + timeline_path = env.timeline_dir(tenant_id, timeline_id) + assert res["local"]["current_physical_size"] == res["local"][ + "current_physical_size_non_incremental"] + assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b1fba29e3b..4913f0b456 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -691,6 +691,10 @@ class NeonEnv: """ Get list of safekeeper endpoints suitable for safekeepers GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) + def timeline_dir(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Path: + """Get a timeline directory's path based on the repo directory of the test environment""" + return self.repo_dir / "tenants" / tenant_id.hex / "timelines" / timeline_id.hex + @cached_property def auth_keys(self) -> AuthKeys: pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() @@ -863,8 +867,8 @@ class NeonPageserverHttpClient(requests.Session): def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1" - ) + f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" + + "?include-non-incremental-logical-size=1&include-non-incremental-physical-size=1") self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index c49fa08d77..bc50a43ada 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -1,9 +1,11 @@ +import contextlib import os +import pathlib import shutil import subprocess from pathlib import Path -from typing import Any, List +from typing import Any, List, Tuple from fixtures.log_helper import log @@ -89,3 +91,36 @@ def get_dir_size(path: str) -> int: pass # file could be concurrently removed return totalbytes + + +def get_timeline_dir_size(path: pathlib.Path) -> int: + """Get the timeline directory's total size, which only counts the layer files' size.""" + sz = 0 + for dir_entry in path.iterdir(): + with contextlib.suppress(Exception): + # file is an image layer + _ = parse_image_layer(dir_entry.name) + sz += dir_entry.stat().st_size + continue + + with contextlib.suppress(Exception): + # file is a delta layer + _ = parse_delta_layer(dir_entry.name) + sz += dir_entry.stat().st_size + continue + return sz + + +def parse_image_layer(f_name: str) -> Tuple[int, int, int]: + """Parse an image layer file name. Return key start, key end, and snapshot lsn""" + parts = f_name.split("__") + key_parts = parts[0].split("-") + return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16) + + +def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]: + """Parse a delta layer file name. Return key start, key end, lsn start, and lsn end""" + parts = f_name.split("__") + key_parts = parts[0].split("-") + lsn_parts = parts[1].split("-") + return int(key_parts[0], 16), int(key_parts[1], 16), int(lsn_parts[0], 16), int(lsn_parts[1], 16)