diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index a4f270580f..232c202ed9 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -129,9 +129,9 @@ pub struct LocalTimelineInfo { pub latest_gc_cutoff_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, - pub current_logical_size: Option, // is None when timeline is Unloaded - pub current_physical_size: Option, // is None when timeline is Unloaded - pub current_logical_size_non_incremental: Option, + pub current_logical_size: Option, // is None when timeline is Unloaded + pub current_physical_size: Option, // is None when timeline is Unloaded + pub current_logical_size_non_incremental: Option, pub current_physical_size_non_incremental: Option, pub timeline_state: LocalTimelineState, diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 1b77f1fab4..8f3004af98 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -15,7 +15,7 @@ use std::fs::{File, OpenOptions}; use std::io::Write; use std::ops::{Deref, Range}; use std::path::PathBuf; -use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering}; +use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; use std::time::{Duration, Instant, SystemTime}; @@ -376,7 +376,22 @@ pub struct Timeline { repartition_threshold: u64, /// Current logical size of the "datadir", at the last LSN. - current_logical_size: AtomicIsize, + /// + /// Size shouldn't ever be negative, but this is signed for two reasons: + /// + /// 1. If we initialized the "baseline" size lazily, while we already + /// process incoming WAL, the incoming WAL records could decrement the + /// variable and temporarily make it negative. (This is just future-proofing; + /// the initialization is currently not done lazily.) + /// + /// 2. If there is a bug and we e.g. forget to increment it in some cases + /// when size grows, but remember to decrement it when it shrinks again, the + /// variable could go negative. In that case, it seems better to at least + /// try to keep tracking it, rather than clamp or overflow it. Note that + /// get_current_logical_size() will clamp the returned value to zero if it's + /// negative, and log an error. Could set it permanently to zero or some + /// special value to indicate "broken" instead, but this will do for now. + current_logical_size: AtomicI64, /// Information about the last processed message by the WAL receiver, /// or None if WAL receiver has not received anything for this timeline @@ -695,7 +710,7 @@ impl Timeline { latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), - current_logical_size: AtomicIsize::new(0), + current_logical_size: AtomicI64::new(0), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -813,7 +828,7 @@ impl Timeline { // Logical size 0 means that it was not initialized, so don't believe that. if ancestor_logical_size != 0 && ancestor.get_last_record_lsn() == self.ancestor_lsn { self.current_logical_size - .store(ancestor_logical_size as isize, AtomicOrdering::SeqCst); + .store(ancestor_logical_size as i64, AtomicOrdering::SeqCst); debug!( "logical size copied from ancestor: {}", ancestor_logical_size @@ -828,7 +843,7 @@ impl Timeline { let last_lsn = self.get_last_record_lsn(); let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?; self.current_logical_size - .store(logical_size as isize, AtomicOrdering::SeqCst); + .store(logical_size as i64, AtomicOrdering::SeqCst); debug!("calculated logical size the hard way: {}", logical_size); timer.stop_and_record(); @@ -837,10 +852,10 @@ impl Timeline { /// Retrieve current logical size of the timeline /// - /// NOTE: counted incrementally, includes ancestors, - pub fn get_current_logical_size(&self) -> usize { + /// NOTE: counted incrementally, includes ancestors. + pub fn get_current_logical_size(&self) -> u64 { let current_logical_size = self.current_logical_size.load(AtomicOrdering::Acquire); - match usize::try_from(current_logical_size) { + match u64::try_from(current_logical_size) { Ok(sz) => sz, Err(_) => { error!( @@ -2245,7 +2260,7 @@ impl<'a> TimelineWriter<'a> { self.tl.finish_write(new_lsn); } - pub fn update_current_logical_size(&self, delta: isize) { + pub fn update_current_logical_size(&self, delta: i64) { self.tl .current_logical_size .fetch_add(delta, AtomicOrdering::SeqCst); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index beaac292ec..0ace850a82 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -364,22 +364,22 @@ impl Timeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. - pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { // Fetch list of database dirs and iterate them let buf = self.get(DBDIR_KEY, lsn)?; let dbdir = DbDirectory::des(&buf)?; - let mut total_size: usize = 0; + let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self.list_rels(*spcnode, *dbnode, lsn)? { let relsize_key = rel_size_to_key(rel); let mut buf = self.get(relsize_key, lsn)?; let relsize = buf.get_u32_le(); - total_size += relsize as usize; + total_size += relsize as u64; } } - Ok(total_size * BLCKSZ as usize) + Ok(total_size * BLCKSZ as u64) } /// @@ -517,7 +517,7 @@ pub struct DatadirModification<'a> { // underlying key-value store by the 'finish' function. pending_updates: HashMap, pending_deletions: Vec>, - pending_nblocks: isize, + pending_nblocks: i64, } impl<'a> DatadirModification<'a> { @@ -676,7 +676,7 @@ impl<'a> DatadirModification<'a> { } // Update logical database size. - self.pending_nblocks -= total_blocks as isize; + self.pending_nblocks -= total_blocks as i64; // Delete all relations and metadata files for the spcnode/dnode self.delete(dbdir_key_range(spcnode, dbnode)); @@ -719,7 +719,7 @@ impl<'a> DatadirModification<'a> { let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); - self.pending_nblocks += nblocks as isize; + self.pending_nblocks += nblocks as i64; // Update relation size cache self.tline.set_cached_rel_size(rel, self.lsn, nblocks); @@ -749,7 +749,7 @@ impl<'a> DatadirModification<'a> { self.tline.set_cached_rel_size(rel, self.lsn, nblocks); // Update logical database size. - self.pending_nblocks -= old_size as isize - nblocks as isize; + self.pending_nblocks -= old_size as i64 - nblocks as i64; } Ok(()) } @@ -771,7 +771,7 @@ impl<'a> DatadirModification<'a> { // Update relation size cache self.tline.set_cached_rel_size(rel, self.lsn, nblocks); - self.pending_nblocks += nblocks as isize - old_size as isize; + self.pending_nblocks += nblocks as i64 - old_size as i64; } Ok(()) } @@ -794,7 +794,7 @@ impl<'a> DatadirModification<'a> { // update logical size let size_key = rel_size_to_key(rel); let old_size = self.get(size_key)?.get_u32_le(); - self.pending_nblocks -= old_size as isize; + self.pending_nblocks -= old_size as i64; // Remove enty from relation size cache self.tline.remove_cached_rel_size(&rel); @@ -936,7 +936,7 @@ impl<'a> DatadirModification<'a> { result?; if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64); self.pending_nblocks = 0; } @@ -964,7 +964,7 @@ impl<'a> DatadirModification<'a> { writer.finish_write(lsn); if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64); } Ok(())