mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Fix logical timeline size tracking
This commit is contained in:
@@ -23,11 +23,11 @@ use zenith_utils::{crashsafe_dir, logging};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::pgdatadir_mapping::DatadirTimeline;
|
||||
use crate::repository::{Repository, Timeline};
|
||||
use crate::tenant_mgr;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::CheckpointConfig;
|
||||
use crate::RepositoryImpl;
|
||||
use crate::{import_datadir, LOG_FILE_NAME};
|
||||
use crate::{repository::RepositoryTimeline, tenant_mgr};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct BranchInfo {
|
||||
@@ -42,10 +42,10 @@ pub struct BranchInfo {
|
||||
}
|
||||
|
||||
impl BranchInfo {
|
||||
pub fn from_path<R: Repository, T: AsRef<Path>>(
|
||||
pub fn from_path<T: AsRef<Path>>(
|
||||
path: T,
|
||||
repo: &R,
|
||||
_include_non_incremental_logical_size: bool,
|
||||
tenantid: ZTenantId,
|
||||
include_non_incremental_logical_size: bool,
|
||||
) -> Result<Self> {
|
||||
let path = path.as_ref();
|
||||
let name = path.file_name().unwrap().to_string_lossy().to_string();
|
||||
@@ -58,35 +58,35 @@ impl BranchInfo {
|
||||
})?
|
||||
.parse::<ZTimelineId>()?;
|
||||
|
||||
let timeline = match repo.get_timeline(timeline_id)? {
|
||||
RepositoryTimeline::Local(local_entry) => local_entry,
|
||||
RepositoryTimeline::Remote { .. } => {
|
||||
bail!("Timeline {} is remote, no branches to display", timeline_id)
|
||||
let timeline = match tenant_mgr::get_timeline_for_tenant(tenantid, timeline_id) {
|
||||
Ok(timeline) => timeline,
|
||||
Err(err) => {
|
||||
// FIXME: this was:
|
||||
// bail!("Timeline {} is remote, no branches to display", timeline_id)
|
||||
//
|
||||
// but we cannot distinguish that from other errors now. Have
|
||||
// get_timeline_for_tenant() return a more specific error
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
// we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id
|
||||
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
|
||||
let (ancestor_id, ancestor_lsn) = match timeline.tline.get_ancestor_timeline_id() {
|
||||
Some(ancestor_id) => (
|
||||
Some(ancestor_id.to_string()),
|
||||
Some(timeline.get_ancestor_lsn().to_string()),
|
||||
Some(timeline.tline.get_ancestor_lsn().to_string()),
|
||||
),
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
// non incremental size calculation can be heavy, so let it be optional
|
||||
// needed for tests to check size calculation
|
||||
//
|
||||
// FIXME
|
||||
/*
|
||||
let current_logical_size_non_incremental = include_non_incremental_logical_size
|
||||
.then(|| {
|
||||
timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn())
|
||||
})
|
||||
.transpose()?;
|
||||
*/
|
||||
let current_logical_size_non_incremental = Some(0);
|
||||
let current_logical_size = 0;
|
||||
let current_logical_size = timeline.get_current_logical_size();
|
||||
|
||||
Ok(BranchInfo {
|
||||
name,
|
||||
@@ -94,7 +94,7 @@ impl BranchInfo {
|
||||
latest_valid_lsn: timeline.get_last_record_lsn(),
|
||||
ancestor_id,
|
||||
ancestor_lsn,
|
||||
current_logical_size, // : timeline.get_current_logical_size(),
|
||||
current_logical_size,
|
||||
current_logical_size_non_incremental,
|
||||
})
|
||||
}
|
||||
@@ -268,8 +268,6 @@ pub(crate) fn get_branches(
|
||||
tenantid: &ZTenantId,
|
||||
include_non_incremental_logical_size: bool,
|
||||
) -> Result<Vec<BranchInfo>> {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?;
|
||||
|
||||
// Each branch has a corresponding record (text file) in the refs/branches
|
||||
// with timeline_id.
|
||||
let branches_dir = conf.branches_path(tenantid);
|
||||
@@ -292,7 +290,7 @@ pub(crate) fn get_branches(
|
||||
})?;
|
||||
BranchInfo::from_path(
|
||||
dir_entry.path(),
|
||||
repo.as_ref(),
|
||||
*tenantid,
|
||||
include_non_incremental_logical_size,
|
||||
)
|
||||
})
|
||||
|
||||
@@ -138,8 +138,7 @@ async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
let response_data = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered();
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
BranchInfo::from_path(path, repo.as_ref(), include_non_incremental_logical_size)
|
||||
BranchInfo::from_path(path, tenantid, include_non_incremental_logical_size)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -860,7 +860,10 @@ impl Timeline for LayeredTimeline {
|
||||
}
|
||||
|
||||
fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()> {
|
||||
self.partitioning.write().unwrap().replace((partitioning, lsn));
|
||||
self.partitioning
|
||||
.write()
|
||||
.unwrap()
|
||||
.replace((partitioning, lsn));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -18,8 +18,9 @@ use postgres_ffi::{pg_constants, Oid, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::AtomicLsn;
|
||||
use zenith_utils::lsn::{Lsn, RecordLsn};
|
||||
@@ -33,6 +34,7 @@ where
|
||||
{
|
||||
pub tline: Arc<R::Timeline>,
|
||||
pub last_partitioning: AtomicLsn,
|
||||
pub current_logical_size: AtomicIsize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -73,9 +75,19 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
DatadirTimeline {
|
||||
tline,
|
||||
last_partitioning: AtomicLsn::new(0),
|
||||
current_logical_size: AtomicIsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_logical_size(&self) -> Result<()> {
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
self.current_logical_size.store(
|
||||
self.get_current_logical_size_non_incremental(last_lsn)? as isize,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Public GET functions
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -270,6 +282,7 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
lsn,
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,17 +299,41 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors,
|
||||
/// doesnt support TwoPhase relishes yet
|
||||
pub fn get_current_logical_size(&self) -> usize {
|
||||
//todo!()
|
||||
0
|
||||
let current_logical_size = self.current_logical_size.load(Ordering::Acquire);
|
||||
match usize::try_from(current_logical_size) {
|
||||
Ok(sz) => sz,
|
||||
Err(_) => {
|
||||
error!(
|
||||
"current_logical_size is out of range: {}",
|
||||
current_logical_size
|
||||
);
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
/// Used in tests to ensure that incremental and non incremental variants match.
|
||||
pub fn get_current_logical_size_non_incremental(&self, _lsn: Lsn) -> Result<usize> {
|
||||
//todo!()
|
||||
Ok(0)
|
||||
/// Used to initialize the logical size tracking on startup.
|
||||
///
|
||||
/// Only relation blocks are counted currently. That excludes metadata,
|
||||
/// SLRUs, twophase files etc.
|
||||
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.tline.get(DBDIR_KEY, lsn)?;
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let mut total_size: usize = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbs {
|
||||
for rel in self.list_rels(spcnode, dbnode, lsn)? {
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
let mut buf = self.tline.get(relsize_key, lsn)?;
|
||||
let relsize = buf.get_u32_le();
|
||||
|
||||
total_size += relsize as usize;
|
||||
}
|
||||
}
|
||||
Ok(total_size * pg_constants::BLCKSZ as usize)
|
||||
}
|
||||
|
||||
fn collect_keyspace(&self, lsn: Lsn) -> Result<KeyPartitioning> {
|
||||
@@ -375,6 +412,7 @@ pub struct DatadirTimelineWriter<'a, R: Repository> {
|
||||
lsn: Lsn,
|
||||
pending_updates: HashMap<Key, Value>,
|
||||
pending_deletions: Vec<Range<Key>>,
|
||||
pending_nblocks: isize,
|
||||
}
|
||||
|
||||
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
|
||||
@@ -534,6 +572,8 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
);
|
||||
}
|
||||
|
||||
// FIXME: update pending_nblocks
|
||||
|
||||
// Delete all relations and metadata files for the spcnode/dnode
|
||||
self.delete(dbdir_key_range(spcnode, dbnode));
|
||||
Ok(())
|
||||
@@ -568,6 +608,8 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks += nblocks as isize;
|
||||
|
||||
// even if nblocks > 0, we don't insert any actual blocks here
|
||||
|
||||
Ok(())
|
||||
@@ -577,8 +619,13 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks -= old_size as isize - nblocks as isize;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -629,8 +676,13 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks += nblocks as isize - old_size as isize;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -647,6 +699,11 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
warn!("dropped rel {} did not exist in rel directory", rel);
|
||||
}
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as isize;
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel));
|
||||
|
||||
@@ -704,23 +761,33 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
|
||||
let writer = self.tline.tline.writer();
|
||||
|
||||
let last_partitioning = self.last_partitioning.load();
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
|
||||
for (key, value) in self.pending_updates {
|
||||
writer.put(key, self.lsn, value)?;
|
||||
}
|
||||
for key_range in self.pending_deletions {
|
||||
writer.delete(key_range, self.lsn)?;
|
||||
writer.delete(key_range.clone(), self.lsn)?;
|
||||
}
|
||||
|
||||
writer.advance_last_record_lsn(self.lsn);
|
||||
|
||||
if last_partitioning == Lsn(0) || self.lsn.0 - last_partitioning.0 > TARGET_FILE_SIZE_BYTES / 8 {
|
||||
if last_partitioning == Lsn(0)
|
||||
|| self.lsn.0 - last_partitioning.0 > TARGET_FILE_SIZE_BYTES / 8
|
||||
{
|
||||
let mut partitioning = self.tline.collect_keyspace(self.lsn)?;
|
||||
partitioning.repartition(TARGET_FILE_SIZE_BYTES);
|
||||
self.tline.tline.hint_partitioning(partitioning, self.lsn)?;
|
||||
self.tline.last_partitioning.store(self.lsn);
|
||||
}
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
self.tline.current_logical_size.fetch_add(
|
||||
pending_nblocks * pg_constants::BLCKSZ as isize,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -284,6 +284,7 @@ pub fn get_timeline_for_tenant(
|
||||
.with_context(|| format!("cannot fetch timeline {}", timelineid))?;
|
||||
|
||||
let page_tline = Arc::new(DatadirTimelineImpl::new(tline));
|
||||
page_tline.init_logical_size()?;
|
||||
tenant.timelines.insert(timelineid, Arc::clone(&page_tline));
|
||||
Ok(page_tline)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user