diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 246d012e8d..912f1d7af0 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -23,11 +23,11 @@ use zenith_utils::{crashsafe_dir, logging}; use crate::config::PageServerConf; use crate::pgdatadir_mapping::DatadirTimeline; use crate::repository::{Repository, Timeline}; +use crate::tenant_mgr; use crate::walredo::WalRedoManager; use crate::CheckpointConfig; use crate::RepositoryImpl; use crate::{import_datadir, LOG_FILE_NAME}; -use crate::{repository::RepositoryTimeline, tenant_mgr}; #[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { @@ -42,10 +42,10 @@ pub struct BranchInfo { } impl BranchInfo { - pub fn from_path>( + pub fn from_path>( path: T, - repo: &R, - _include_non_incremental_logical_size: bool, + tenantid: ZTenantId, + include_non_incremental_logical_size: bool, ) -> Result { let path = path.as_ref(); let name = path.file_name().unwrap().to_string_lossy().to_string(); @@ -58,35 +58,35 @@ impl BranchInfo { })? .parse::()?; - let timeline = match repo.get_timeline(timeline_id)? { - RepositoryTimeline::Local(local_entry) => local_entry, - RepositoryTimeline::Remote { .. } => { - bail!("Timeline {} is remote, no branches to display", timeline_id) + let timeline = match tenant_mgr::get_timeline_for_tenant(tenantid, timeline_id) { + Ok(timeline) => timeline, + Err(err) => { + // FIXME: this was: + // bail!("Timeline {} is remote, no branches to display", timeline_id) + // + // but we cannot distinguish that from other errors now. Have + // get_timeline_for_tenant() return a more specific error + return Err(err); } }; // we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id - let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() { + let (ancestor_id, ancestor_lsn) = match timeline.tline.get_ancestor_timeline_id() { Some(ancestor_id) => ( Some(ancestor_id.to_string()), - Some(timeline.get_ancestor_lsn().to_string()), + Some(timeline.tline.get_ancestor_lsn().to_string()), ), None => (None, None), }; // non incremental size calculation can be heavy, so let it be optional // needed for tests to check size calculation - // - // FIXME - /* let current_logical_size_non_incremental = include_non_incremental_logical_size .then(|| { timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) }) .transpose()?; - */ - let current_logical_size_non_incremental = Some(0); - let current_logical_size = 0; + let current_logical_size = timeline.get_current_logical_size(); Ok(BranchInfo { name, @@ -94,7 +94,7 @@ impl BranchInfo { latest_valid_lsn: timeline.get_last_record_lsn(), ancestor_id, ancestor_lsn, - current_logical_size, // : timeline.get_current_logical_size(), + current_logical_size, current_logical_size_non_incremental, }) } @@ -268,8 +268,6 @@ pub(crate) fn get_branches( tenantid: &ZTenantId, include_non_incremental_logical_size: bool, ) -> Result> { - let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?; - // Each branch has a corresponding record (text file) in the refs/branches // with timeline_id. let branches_dir = conf.branches_path(tenantid); @@ -292,7 +290,7 @@ pub(crate) fn get_branches( })?; BranchInfo::from_path( dir_entry.path(), - repo.as_ref(), + *tenantid, include_non_incremental_logical_size, ) }) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 4794bf72b9..3449585b63 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -138,8 +138,7 @@ async fn branch_detail_handler(request: Request) -> Result, let response_data = tokio::task::spawn_blocking(move || { let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered(); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - BranchInfo::from_path(path, repo.as_ref(), include_non_incremental_logical_size) + BranchInfo::from_path(path, tenantid, include_non_incremental_logical_size) }) .await .map_err(ApiError::from_err)??; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index fd4d494d3b..c3d910549b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -860,7 +860,10 @@ impl Timeline for LayeredTimeline { } fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()> { - self.partitioning.write().unwrap().replace((partitioning, lsn)); + self.partitioning + .write() + .unwrap() + .replace((partitioning, lsn)); Ok(()) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 01f52e7a2d..fb68490d44 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -18,8 +18,9 @@ use postgres_ffi::{pg_constants, Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ops::Range; +use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::{Arc, RwLockReadGuard}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::AtomicLsn; use zenith_utils::lsn::{Lsn, RecordLsn}; @@ -33,6 +34,7 @@ where { pub tline: Arc, pub last_partitioning: AtomicLsn, + pub current_logical_size: AtomicIsize, } #[derive(Debug, Serialize, Deserialize)] @@ -73,9 +75,19 @@ impl DatadirTimeline { DatadirTimeline { tline, last_partitioning: AtomicLsn::new(0), + current_logical_size: AtomicIsize::new(0), } } + pub fn init_logical_size(&self) -> Result<()> { + let last_lsn = self.tline.get_last_record_lsn(); + self.current_logical_size.store( + self.get_current_logical_size_non_incremental(last_lsn)? as isize, + Ordering::SeqCst, + ); + Ok(()) + } + //------------------------------------------------------------------------------ // Public GET functions //------------------------------------------------------------------------------ @@ -270,6 +282,7 @@ impl DatadirTimeline { lsn, pending_updates: HashMap::new(), pending_deletions: Vec::new(), + pending_nblocks: 0, } } @@ -286,17 +299,41 @@ impl DatadirTimeline { /// Retrieve current logical size of the timeline /// /// NOTE: counted incrementally, includes ancestors, - /// doesnt support TwoPhase relishes yet pub fn get_current_logical_size(&self) -> usize { - //todo!() - 0 + let current_logical_size = self.current_logical_size.load(Ordering::Acquire); + match usize::try_from(current_logical_size) { + Ok(sz) => sz, + Err(_) => { + error!( + "current_logical_size is out of range: {}", + current_logical_size + ); + 0 + } + } } /// Does the same as get_current_logical_size but counted on demand. - /// Used in tests to ensure that incremental and non incremental variants match. - pub fn get_current_logical_size_non_incremental(&self, _lsn: Lsn) -> Result { - //todo!() - Ok(0) + /// Used to initialize the logical size tracking on startup. + /// + /// Only relation blocks are counted currently. That excludes metadata, + /// SLRUs, twophase files etc. + pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + // Fetch list of database dirs and iterate them + let buf = self.tline.get(DBDIR_KEY, lsn)?; + let dbdir = DbDirectory::des(&buf)?; + + let mut total_size: usize = 0; + for (spcnode, dbnode) in dbdir.dbs { + for rel in self.list_rels(spcnode, dbnode, lsn)? { + let relsize_key = rel_size_to_key(rel); + let mut buf = self.tline.get(relsize_key, lsn)?; + let relsize = buf.get_u32_le(); + + total_size += relsize as usize; + } + } + Ok(total_size * pg_constants::BLCKSZ as usize) } fn collect_keyspace(&self, lsn: Lsn) -> Result { @@ -375,6 +412,7 @@ pub struct DatadirTimelineWriter<'a, R: Repository> { lsn: Lsn, pending_updates: HashMap, pending_deletions: Vec>, + pending_nblocks: isize, } // TODO Currently, Deref is used to allow easy access to read methods from this trait. @@ -534,6 +572,8 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { ); } + // FIXME: update pending_nblocks + // Delete all relations and metadata files for the spcnode/dnode self.delete(dbdir_key_range(spcnode, dbnode)); Ok(()) @@ -568,6 +608,8 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + self.pending_nblocks += nblocks as isize; + // even if nblocks > 0, we don't insert any actual blocks here Ok(()) @@ -577,8 +619,13 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { // Put size let size_key = rel_size_to_key(rel); + + let old_size = self.get(size_key)?.get_u32_le(); + let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + + self.pending_nblocks -= old_size as isize - nblocks as isize; Ok(()) } @@ -629,8 +676,13 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { // Put size let size_key = rel_size_to_key(rel); + + let old_size = self.get(size_key)?.get_u32_le(); + let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + + self.pending_nblocks += nblocks as isize - old_size as isize; Ok(()) } @@ -647,6 +699,11 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { warn!("dropped rel {} did not exist in rel directory", rel); } + // update logical size + let size_key = rel_size_to_key(rel); + let old_size = self.get(size_key)?.get_u32_le(); + self.pending_nblocks -= old_size as isize; + // Delete size entry, as well as all blocks self.delete(rel_key_range(rel)); @@ -704,23 +761,33 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { let writer = self.tline.tline.writer(); let last_partitioning = self.last_partitioning.load(); + let pending_nblocks = self.pending_nblocks; for (key, value) in self.pending_updates { writer.put(key, self.lsn, value)?; } for key_range in self.pending_deletions { - writer.delete(key_range, self.lsn)?; + writer.delete(key_range.clone(), self.lsn)?; } writer.advance_last_record_lsn(self.lsn); - if last_partitioning == Lsn(0) || self.lsn.0 - last_partitioning.0 > TARGET_FILE_SIZE_BYTES / 8 { + if last_partitioning == Lsn(0) + || self.lsn.0 - last_partitioning.0 > TARGET_FILE_SIZE_BYTES / 8 + { let mut partitioning = self.tline.collect_keyspace(self.lsn)?; partitioning.repartition(TARGET_FILE_SIZE_BYTES); self.tline.tline.hint_partitioning(partitioning, self.lsn)?; self.tline.last_partitioning.store(self.lsn); } + if pending_nblocks != 0 { + self.tline.current_logical_size.fetch_add( + pending_nblocks * pg_constants::BLCKSZ as isize, + Ordering::SeqCst, + ); + } + Ok(()) } diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 7adea39b6a..dc53ffebbe 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -284,6 +284,7 @@ pub fn get_timeline_for_tenant( .with_context(|| format!("cannot fetch timeline {}", timelineid))?; let page_tline = Arc::new(DatadirTimelineImpl::new(tline)); + page_tline.init_logical_size()?; tenant.timelines.insert(timelineid, Arc::clone(&page_tline)); Ok(page_tline) }