diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index d25a73ef25..00b7340de5 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; @@ -230,7 +229,7 @@ impl PageServerNode { tenantid: &ZTenantId, ) -> Result { Ok(self - .http_request(Method::POST, format!("{}/{}", self.http_base_url, "branch")) + .http_request(Method::POST, format!("{}/branch", self.http_base_url)) .json(&BranchCreateRequest { tenant_id: tenantid.to_owned(), name: branch_name.to_owned(), @@ -241,24 +240,19 @@ impl PageServerNode { .json()?) } - // TODO: make this a separate request type and avoid loading all the branches pub fn branch_get_by_name( &self, tenantid: &ZTenantId, branch_name: &str, ) -> Result { - let branch_infos = self.branch_list(tenantid)?; - let branch_by_name: Result> = branch_infos - .into_iter() - .map(|branch_info| Ok((branch_info.name.clone(), branch_info))) - .collect(); - let branch_by_name = branch_by_name?; - - let branch = branch_by_name - .get(branch_name) - .ok_or_else(|| anyhow!("Branch {} not found", branch_name))?; - - Ok(branch.clone()) + Ok(self + .http_request( + Method::GET, + format!("{}/branch/{}/{}", self.http_base_url, tenantid, branch_name), + ) + .send()? + .error_for_status()? + .json()?) } } diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index e8ed47a1df..b7cff72477 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -30,9 +30,64 @@ pub struct BranchInfo { pub name: String, #[serde(with = "hex")] pub timeline_id: ZTimelineId, - pub latest_valid_lsn: Option, + pub latest_valid_lsn: Lsn, pub ancestor_id: Option, pub ancestor_lsn: Option, + pub current_logical_size: usize, + pub current_logical_size_non_incremental: usize, +} + +impl BranchInfo { + pub fn from_path>( + path: T, + conf: &PageServerConf, + tenantid: &ZTenantId, + repo: &Arc, + ) -> Result { + let name = path + .as_ref() + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); + let timeline_id = std::fs::read_to_string(path)?.parse::()?; + + let timeline = repo.get_timeline(timeline_id)?; + + let ancestor_path = conf.ancestor_path(&timeline_id, tenantid); + let mut ancestor_id: Option = None; + let mut ancestor_lsn: Option = None; + + if ancestor_path.exists() { + let ancestor = std::fs::read_to_string(ancestor_path)?; + let mut strings = ancestor.split('@'); + + ancestor_id = Some( + strings + .next() + .with_context(|| "wrong branch ancestor point in time format")? + .to_owned(), + ); + ancestor_lsn = Some( + strings + .next() + .with_context(|| "wrong branch ancestor point in time format")? + .to_owned(), + ); + } + + Ok(BranchInfo { + name, + timeline_id, + latest_valid_lsn: timeline.get_last_record_lsn(), + ancestor_id, + ancestor_lsn, + current_logical_size: timeline.get_current_logical_size(), + current_logical_size_non_incremental: timeline + .get_current_logical_size_non_incremental(timeline.get_last_record_lsn())?, + }) + } } #[derive(Debug, Clone, Copy)] @@ -210,43 +265,12 @@ pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Resul std::fs::read_dir(&branches_dir)? .map(|dir_entry_res| { let dir_entry = dir_entry_res?; - let name = dir_entry.file_name().to_str().unwrap().to_string(); - let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::()?; - - let latest_valid_lsn = repo - .get_timeline(timeline_id) - .map(|timeline| timeline.get_last_record_lsn()) - .ok(); - - let ancestor_path = conf.ancestor_path(&timeline_id, tenantid); - let mut ancestor_id: Option = None; - let mut ancestor_lsn: Option = None; - - if ancestor_path.exists() { - let ancestor = std::fs::read_to_string(ancestor_path)?; - let mut strings = ancestor.split('@'); - - ancestor_id = Some( - strings - .next() - .with_context(|| "wrong branch ancestor point in time format")? - .to_owned(), - ); - ancestor_lsn = Some( - strings - .next() - .with_context(|| "wrong branch ancestor point in time format")? - .to_owned(), - ); - } - - Ok(BranchInfo { - name, - timeline_id, - latest_valid_lsn, - ancestor_id, - ancestor_lsn, - }) + Ok(BranchInfo::from_path( + dir_entry.path(), + conf, + tenantid, + &repo, + )?) }) .collect() } @@ -296,9 +320,11 @@ pub(crate) fn create_branch( Ok(BranchInfo { name: branchname.to_string(), timeline_id: newtli, - latest_valid_lsn: Some(startpoint.lsn), + latest_valid_lsn: startpoint.lsn, ancestor_id: None, ancestor_lsn: None, + current_logical_size: 0, + current_logical_size_non_incremental: 0, }) } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index b4f61cb709..b173862146 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -54,7 +54,52 @@ paths: application/json: schema: $ref: "#/components/schemas/ForbiddenError" - + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /v1/branch/{tenant_id}/{branch_name}: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: branch_name + in: path + required: true + schema: + type: string + get: + description: Get branches for tenant + responses: + "200": + description: BranchInfo + content: + application/json: + schema: + $ref: "#/components/schemas/BranchInfo" + "400": + description: Error when no tenant id found in path or no branch name + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" "500": description: Generic operation error content: @@ -203,6 +248,9 @@ components: required: - name - timeline_id + - latest_valid_lsn + - current_logical_size + - current_logical_size_non_incremental properties: name: type: string @@ -213,6 +261,10 @@ components: type: string ancestor_lsn: type: string + current_logical_size: + type: integer + current_logical_size_non_incremental: + type: integer Error: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ea81e1a8a3..c5b8a42499 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1,3 +1,4 @@ +use std::str::FromStr; use std::sync::Arc; use anyhow::Result; @@ -18,10 +19,8 @@ use zenith_utils::http::{ use super::models::BranchCreateRequest; use super::models::TenantCreateRequest; -use crate::{ - branches::{self}, - tenant_mgr, PageServerConf, ZTenantId, -}; +use crate::branches::BranchInfo; +use crate::{branches, tenant_mgr, PageServerConf, ZTenantId}; #[derive(Debug)] struct State { @@ -57,6 +56,35 @@ fn get_config(request: &Request) -> &'static PageServerConf { get_state(request).conf } +fn get_request_param<'a>( + request: &'a Request, + param_name: &str, +) -> Result<&'a str, ApiError> { + match request.param(param_name) { + Some(arg) => Ok(arg), + None => { + return Err(ApiError::BadRequest(format!( + "no {} specified in path param", + param_name + ))) + } + } +} + +fn parse_request_param( + request: &Request, + param_name: &str, +) -> Result { + match get_request_param(request, param_name)?.parse() { + Ok(v) => Ok(v), + Err(_) => { + return Err(ApiError::BadRequest( + "failed to parse tenant id".to_string(), + )) + } + } +} + // healthcheck handler async fn status_handler(_: Request) -> Result, ApiError> { Ok(Response::builder() @@ -85,16 +113,7 @@ async fn branch_create_handler(mut request: Request) -> Result) -> Result, ApiError> { - let tenantid: ZTenantId = match request.param("tenant_id") { - Some(arg) => arg - .parse() - .map_err(|_| ApiError::BadRequest("failed to parse tenant id".to_string()))?, - None => { - return Err(ApiError::BadRequest( - "no tenant id specified in path param".to_string(), - )) - } - }; + let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenantid))?; @@ -106,6 +125,23 @@ async fn branch_list_handler(request: Request) -> Result, A Ok(json_response(StatusCode::OK, response_data)?) } +// TODO add to swagger +async fn branch_detail_handler(request: Request) -> Result, ApiError> { + let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?; + let branch_name: &str = get_request_param(&request, "branch_name")?; + let conf = get_state(&request).conf; + let path = conf.branch_path(&branch_name, &tenantid); + + let response_data = tokio::task::spawn_blocking(move || { + let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?; + BranchInfo::from_path(path, conf, &tenantid, &repo) + }) + .await + .map_err(ApiError::from_err)??; + + Ok(json_response(StatusCode::OK, response_data)?) +} + async fn tenant_list_handler(request: Request) -> Result, ApiError> { // check for management permission check_permission(&request, None)?; @@ -159,6 +195,7 @@ pub fn make_router( .data(Arc::new(State::new(conf, auth))) .get("/v1/status", status_handler) .get("/v1/branch/:tenant_id", branch_list_handler) + .get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler) .post("/v1/branch", branch_create_handler) .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 7e20affbd9..4eda43553b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -15,18 +15,20 @@ use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use lazy_static::lazy_static; use log::*; +use postgres_ffi::pg_constants::BLCKSZ; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::collections::{BTreeSet, HashSet}; -use std::fs; use std::fs::File; use std::io::Write; use std::ops::Bound::Included; use std::path::Path; use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use std::{fs, thread}; use crate::relish::*; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; @@ -35,7 +37,7 @@ use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; -use zenith_metrics::{register_histogram, Histogram}; +use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn}; @@ -92,6 +94,16 @@ lazy_static! { .expect("failed to define a metric"); } +lazy_static! { + // NOTE: can be zero if pageserver was restarted an no activity happening + static ref LOGICAL_TIMELINE_SIZE: IntGaugeVec = register_int_gauge_vec!( + "pageserver_logical_timeline_size", + "Logical timeline size (bytes)", + &["tenant_id", "timeline_id"] + ) + .expect("failed to define a metric"); +} + /// /// Repository consists of multiple timelines. Keep them in a hash table. /// @@ -138,6 +150,7 @@ impl Repository for LayeredRepository { timelineid, self.tenantid, self.walredo_mgr.clone(), + 0, )?; let timeline_rc = Arc::new(timeline); @@ -221,18 +234,24 @@ impl LayeredRepository { None }; - let timeline = LayeredTimeline::new( + let mut timeline = LayeredTimeline::new( self.conf, metadata, ancestor, timelineid, self.tenantid, self.walredo_mgr.clone(), + 0, // init with 0 and update after layers are loaded )?; // List the layers on disk, and load them into the layer map timeline.load_layer_map()?; + // needs to be after load_layer_map + timeline.init_current_logical_size()?; + + let timeline = Arc::new(timeline); + // Load any new WAL after the last checkpoint into memory. info!( "Loading WAL for timeline {} starting at {}", @@ -243,11 +262,15 @@ impl LayeredRepository { .conf .timeline_path(&timelineid, &self.tenantid) .join("wal"); - import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; + import_timeline_wal(&wal_dir, timeline.as_ref(), timeline.get_last_record_lsn())?; - let timeline_rc = Arc::new(timeline); - timelines.insert(timelineid, timeline_rc.clone()); - Ok(timeline_rc) + if cfg!(debug_assertions) { + // check again after wal loading + Self::assert_size_calculation_matches_offloaded(Arc::clone(&timeline)); + } + + timelines.insert(timelineid, timeline.clone()); + Ok(timeline.clone()) } } } @@ -445,6 +468,24 @@ impl LayeredRepository { totals.elapsed = now.elapsed(); Ok(totals) } + + fn assert_size_calculation_matches(incremental: usize, timeline: &LayeredTimeline) { + match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) { + Ok(non_incremental) => { + if incremental != non_incremental { + error!("timeline size calculation diverged, incremental doesn't match non incremental. incremental={} non_incremental={}", incremental, non_incremental); + } + } + Err(e) => error!("failed to calculate non incremental timeline size: {}", e), + } + } + + fn assert_size_calculation_matches_offloaded(timeline: Arc) { + let incremental = timeline.get_current_logical_size(); + thread::spawn(move || { + Self::assert_size_calculation_matches(incremental, &timeline); + }); + } } /// Metadata stored on disk for each timeline @@ -509,6 +550,16 @@ pub struct LayeredTimeline { // of the branch point. ancestor_timeline: Option>, ancestor_lsn: Lsn, + + // this variable indicates how much space is used from user's point of view, + // e.g. we do not account here for multiple versions of data and so on. + // this is counted incrementally based on physical relishes (excluding FileNodeMap) + // current_logical_size is not stored no disk and initialized on timeline creation using + // get_current_logical_size_non_incremental in init_current_logical_size + // this is needed because when we save it in metadata it can become out of sync + // because current_logical_size is consistent on last_record_lsn, not ondisk_consistent_lsn + // NOTE: current_logical_size also includes size of the ancestor + current_logical_size: AtomicUsize, // bytes } /// Public interface functions @@ -654,9 +705,9 @@ impl Timeline for LayeredTimeline { } let seg = SegmentTag::from_blknum(rel, blknum); - let layer = self.get_layer_for_write(seg, rec.lsn)?; - layer.put_wal_record(blknum, rec) + self.increase_current_logical_size(layer.put_wal_record(blknum, rec)? * BLCKSZ as u32); + Ok(()) } fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> anyhow::Result<()> { @@ -706,7 +757,7 @@ impl Timeline for LayeredTimeline { let layer = self.get_layer_for_write(seg, lsn)?; layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)?; } - + self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32); Ok(()) } @@ -714,8 +765,7 @@ impl Timeline for LayeredTimeline { trace!("drop_segment: {} at {}", rel, lsn); if rel.is_blocky() { - let oldsize_opt = self.get_relish_size(rel, self.get_last_record_lsn())?; - if let Some(oldsize) = oldsize_opt { + if let Some(oldsize) = self.get_relish_size(rel, self.get_last_record_lsn())? { let old_last_seg = if oldsize == 0 { 0 } else { @@ -731,6 +781,7 @@ impl Timeline for LayeredTimeline { let layer = self.get_layer_for_write(seg, lsn)?; layer.drop_segment(lsn)?; } + self.decrease_current_logical_size(oldsize * BLCKSZ as u32); } else { warn!( "drop_segment called on non-existent relish {} at {}", @@ -738,6 +789,7 @@ impl Timeline for LayeredTimeline { ); } } else { + // TODO handle TwoPhase relishes let seg = SegmentTag::from_blknum(rel, 0); let layer = self.get_layer_for_write(seg, lsn)?; layer.drop_segment(lsn)?; @@ -758,7 +810,8 @@ impl Timeline for LayeredTimeline { let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.get_layer_for_write(seg, lsn)?; - layer.put_page_image(blknum, lsn, img) + self.increase_current_logical_size(layer.put_page_image(blknum, lsn, img)? * BLCKSZ as u32); + Ok(()) } /// Public entry point for checkpoint(). All the logic is in the private @@ -802,6 +855,35 @@ impl Timeline for LayeredTimeline { self.ancestor_lsn } } + + fn get_current_logical_size(&self) -> usize { + self.current_logical_size.load(Ordering::Acquire) as usize + } + + fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + let mut total_blocks: usize = 0; + + // list of all relations in this timeline, including ancestor timelines + let all_rels = self.list_rels(0, 0, lsn)?; + + for rel in all_rels { + if let Some(size) = self.get_relish_size(RelishTag::Relation(rel), lsn)? { + total_blocks += size as usize; + } + } + + let non_rels = self.list_nonrels(lsn)?; + for non_rel in non_rels { + // TODO support TwoPhase + if matches!(non_rel, RelishTag::Slru { slru: _, segno: _ }) { + if let Some(size) = self.get_relish_size(non_rel, lsn)? { + total_blocks += size as usize; + } + } + } + + Ok(total_blocks * BLCKSZ as usize) + } } impl LayeredTimeline { @@ -815,6 +897,7 @@ impl LayeredTimeline { timelineid: ZTimelineId, tenantid: ZTenantId, walredo_mgr: Arc, + current_logical_size: usize, ) -> Result { let timeline = LayeredTimeline { conf, @@ -833,6 +916,7 @@ impl LayeredTimeline { ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, + current_logical_size: AtomicUsize::new(current_logical_size), }; Ok(timeline) } @@ -896,6 +980,23 @@ impl LayeredTimeline { Ok(()) } + /// + /// Used to init current logical size on startup + /// + fn init_current_logical_size(&mut self) -> Result<()> { + if self.current_logical_size.load(Ordering::Relaxed) != 0 { + bail!("cannot init already initialized current logical size") + }; + let lsn = self.get_last_record_lsn(); + self.current_logical_size = + AtomicUsize::new(self.get_current_logical_size_non_incremental(lsn)?); + trace!( + "current_logical_size initialized to {}", + self.current_logical_size.load(Ordering::Relaxed) + ); + Ok(()) + } + /// /// Get a handle to a Layer for reading. /// @@ -1455,6 +1556,42 @@ impl LayeredTimeline { } } } + + /// + /// This is a helper function to increase current_total_relation_size + /// + fn increase_current_logical_size(&self, diff: u32) { + let val = self + .current_logical_size + .fetch_add(diff as usize, Ordering::SeqCst); + trace!( + "increase_current_logical_size: {} + {} = {}", + val, + diff, + val + diff as usize, + ); + LOGICAL_TIMELINE_SIZE + .with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()]) + .set(val as i64 + diff as i64) + } + + /// + /// This is a helper function to decrease current_total_relation_size + /// + fn decrease_current_logical_size(&self, diff: u32) { + let val = self + .current_logical_size + .fetch_sub(diff as usize, Ordering::SeqCst); + trace!( + "decrease_current_logical_size: {} - {} = {}", + val, + diff, + val - diff as usize, + ); + LOGICAL_TIMELINE_SIZE + .with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()]) + .set(val as i64 - diff as i64) + } } /// Dump contents of a layer file to stdout. diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 361d47f0bb..0b2f24e555 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -310,7 +310,7 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> { + pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result { self.put_page_version( blknum, rec.lsn, @@ -322,7 +322,7 @@ impl InMemoryLayer { } /// Remember new page version, as a full page image - pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> { + pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result { self.put_page_version( blknum, lsn, @@ -335,7 +335,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> { + pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result { assert!(self.seg.blknum_in_seg(blknum)); trace!( @@ -403,15 +403,20 @@ impl InMemoryLayer { } inner.segsizes.insert(lsn, newsize); + return Ok(newsize - oldsize); } } - - Ok(()) + Ok(0) } /// Remember that the relation was truncated at given LSN pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> { let mut inner = self.inner.lock().unwrap(); + + // check that this we truncate to a smaller size than segment was before the truncation + let oldsize = inner.get_seg_size(lsn); + assert!(segsize < oldsize); + let old = inner.segsizes.insert(lsn, segsize); if old.is_some() { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 310a8f67e9..2ffda7d8e1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -45,9 +45,6 @@ pub trait Repository: Send + Sync { horizon: u64, compact: bool, ) -> Result; - - // TODO get timelines? - //fn get_stats(&self) -> RepositoryStats; } /// @@ -133,7 +130,7 @@ pub trait Timeline: Send + Sync { /// Truncate relation fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()>; - /// This method is used for marking dropped relations and truncated SLRU files + /// This method is used for marking dropped relations and truncated SLRU files and aborted two phase records fn drop_relish(&self, tag: RelishTag, lsn: Lsn) -> Result<()>; /// Track end of the latest digested WAL record. @@ -154,14 +151,16 @@ pub trait Timeline: Send + Sync { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint(&self) -> Result<()>; -} -#[derive(Clone)] -pub struct RepositoryStats { - pub num_entries: Lsn, - pub num_page_images: Lsn, - pub num_wal_records: Lsn, - pub num_getpage_requests: Lsn, + /// Retrieve current logical size of the timeline + /// + /// NOTE: counted incrementally, includes ancestors, + /// doesnt support TwoPhase relishes yet + fn get_current_logical_size(&self) -> usize; + + /// Does the same as get_current_logical_size but counted on demand. + /// Used in tests to ensure thet incremental and non incremental variants match. + fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result; } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -239,6 +238,14 @@ mod tests { buf.freeze() } + fn assert_current_logical_size(timeline: &Arc, lsn: Lsn) { + let incremental = timeline.get_current_logical_size(); + let non_incremental = timeline + .get_current_logical_size_non_incremental(lsn) + .unwrap(); + assert_eq!(incremental, non_incremental); + } + static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); fn get_test_repo(test_name: &str) -> Result> { @@ -294,6 +301,8 @@ mod tests { tline.advance_last_record_lsn(Lsn(0x50)); + assert_current_logical_size(&tline, Lsn(0x50)); + // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false); assert!(tline.get_relish_size(TESTREL_A, Lsn(0x10))?.is_none()); @@ -338,6 +347,7 @@ mod tests { // Truncate last block tline.put_truncation(TESTREL_A, Lsn(0x60), 2)?; tline.advance_last_record_lsn(Lsn(0x60)); + assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 2); @@ -407,6 +417,8 @@ mod tests { } tline.advance_last_record_lsn(Lsn(lsn)); + assert_current_logical_size(&tline, Lsn(lsn)); + assert_eq!( tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), pg_constants::RELSEG_SIZE + 1 @@ -420,6 +432,7 @@ mod tests { tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), pg_constants::RELSEG_SIZE ); + assert_current_logical_size(&tline, Lsn(lsn)); // Truncate another block lsn += 0x10; @@ -429,6 +442,7 @@ mod tests { tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), pg_constants::RELSEG_SIZE - 1 ); + assert_current_logical_size(&tline, Lsn(lsn)); // Truncate to 1500, and then truncate all the way down to 0, one block at a time // This tests the behavior at segment boundaries @@ -444,6 +458,7 @@ mod tests { size -= 1; } + assert_current_logical_size(&tline, Lsn(lsn)); Ok(()) } @@ -470,6 +485,7 @@ mod tests { tline.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?; tline.advance_last_record_lsn(Lsn(0x40)); + assert_current_logical_size(&tline, Lsn(0x40)); // Branch the history, modify relation differently on the new timeline let newtimelineid = ZTimelineId::from_str("AA223344556677881122334455667788").unwrap(); @@ -497,6 +513,8 @@ mod tests { assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(0x40))?.unwrap(), 1); + assert_current_logical_size(&tline, Lsn(0x40)); + Ok(()) } diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py new file mode 100644 index 0000000000..45b0c98d40 --- /dev/null +++ b/test_runner/batch_others/test_timeline_size.py @@ -0,0 +1,39 @@ +from contextlib import closing +from uuid import UUID +import psycopg2.extras +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + + +def test_timeline_size( + zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin +): + # Branch at the point where only 100 rows were inserted + zenith_cli.run(["branch", "test_timeline_size", "empty"]) + + client = pageserver.http_client() + res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + assert res["current_logical_size"] == res["current_logical_size_non_incremental"] + + pgmain = postgres.create_start("test_timeline_size") + print("postgres is running on 'test_timeline_size' branch") + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + cur.execute("SHOW zenith.zenith_timeline") + + # Create table, and insert the first 100 rows + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 10) g + """ + ) + + res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + assert res["current_logical_size"] == res["current_logical_size_non_incremental"] + cur.execute("TRUNCATE foo") + + res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + assert res["current_logical_size"] == res["current_logical_size_non_incremental"] diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 029733da6a..39d7a6690d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -224,6 +224,13 @@ class ZenithPageserverHttpClient(requests.Session): res.raise_for_status() return res.json() + def branch_detail(self, tenant_id: uuid.UUID, name: str) -> Dict: + res = self.get( + f"http://localhost:{self.port}/v1/branch/{tenant_id.hex}/{name}", + ) + res.raise_for_status() + return res.json() + def tenant_list(self) -> List[str]: res = self.get(f"http://localhost:{self.port}/v1/tenant") res.raise_for_status() diff --git a/zenith/src/main.rs b/zenith/src/main.rs index d73c7eed27..ff9337579d 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -13,7 +13,6 @@ use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::{ZTenantId, ZTimelineId}; use pageserver::branches::BranchInfo; -use zenith_utils::lsn::Lsn; /// /// Branches tree element used as a value in the HashMap. @@ -393,9 +392,7 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?; println!( "Created branch '{}' at {:?} for tenant: {}", - branch.name, - branch.latest_valid_lsn.unwrap_or(Lsn(0)), - tenantid, + branch.name, branch.latest_valid_lsn, tenantid, ); } else { let tenantid: ZTenantId = branch_match @@ -434,9 +431,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { node.address, branch_infos .get(&node.timelineid) - .map(|bi| bi - .latest_valid_lsn - .map_or("?".to_string(), |lsn| lsn.to_string())) + .map(|bi| bi.latest_valid_lsn.to_string()) .unwrap_or_else(|| "?".to_string()), node.status(), );