mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 17:40:37 +00:00
Calculate timeline initial logical size in the background
Start the calculation on the first size request, return partially calculated size during calculation, retry if failed. Remove "fast" size init through the ancestor: the current approach is fast enough for now and there are better ways to optimize the calculation via incremental ancestor size computation
This commit is contained in:
committed by
Kirill Bulatov
parent
8a7333438a
commit
f78a542cba
@@ -75,7 +75,7 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
|
||||
// Helper functions to construct a LocalTimelineInfo struct for a timeline
|
||||
|
||||
fn local_timeline_info_from_loaded_timeline(
|
||||
timeline: &Timeline,
|
||||
timeline: &Arc<Timeline>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<LocalTimelineInfo> {
|
||||
@@ -106,7 +106,11 @@ fn local_timeline_info_from_loaded_timeline(
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Loaded,
|
||||
current_logical_size: Some(timeline.get_current_logical_size()),
|
||||
current_logical_size: Some(
|
||||
timeline
|
||||
.get_current_logical_size()
|
||||
.context("Timeline info creation failed to get current logical size")?,
|
||||
),
|
||||
current_physical_size: Some(timeline.get_physical_size()),
|
||||
current_logical_size_non_incremental: if include_non_incremental_logical_size {
|
||||
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
|
||||
@@ -212,7 +216,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
) {
|
||||
Ok(Some((new_timeline_id, new_timeline))) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let local_info = local_timeline_info_from_loaded_timeline(new_timeline.as_ref(), false, false)?;
|
||||
let local_info = local_timeline_info_from_loaded_timeline(&new_timeline, false, false)?;
|
||||
Ok(Some(TimelineInfo {
|
||||
tenant_id,
|
||||
timeline_id: new_timeline_id,
|
||||
|
||||
@@ -136,14 +136,11 @@ impl Repository {
|
||||
}
|
||||
|
||||
/// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded.
|
||||
pub fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>> {
|
||||
pub fn get_timeline_load(&self, timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
match self.get_timeline_load_internal(timelineid, &mut timelines)? {
|
||||
match self.get_timeline_load_internal(timeline_id, &mut timelines)? {
|
||||
Some(local_loaded_timeline) => Ok(local_loaded_timeline),
|
||||
None => anyhow::bail!(
|
||||
"cannot get local timeline: unknown timeline id: {}",
|
||||
timelineid
|
||||
),
|
||||
None => anyhow::bail!("cannot get local timeline, unknown timeline id: {timeline_id}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -559,33 +556,34 @@ impl Repository {
|
||||
timeline_id: ZTimelineId,
|
||||
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
|
||||
) -> anyhow::Result<Option<Arc<Timeline>>> {
|
||||
match timelines.get(&timeline_id) {
|
||||
Ok(match timelines.get(&timeline_id) {
|
||||
Some(entry) => match entry {
|
||||
LayeredTimelineEntry::Loaded(local_timeline) => {
|
||||
debug!("timeline {timeline_id} found loaded into memory");
|
||||
return Ok(Some(Arc::clone(local_timeline)));
|
||||
Some(Arc::clone(local_timeline))
|
||||
}
|
||||
LayeredTimelineEntry::Unloaded { .. } => {
|
||||
debug!(
|
||||
"timeline {timeline_id} found on a local disk, but not loaded into the memory, loading"
|
||||
);
|
||||
let timeline = self.load_local_timeline(timeline_id, timelines)?;
|
||||
let was_loaded = timelines.insert(
|
||||
timeline_id,
|
||||
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
|
||||
);
|
||||
ensure!(
|
||||
was_loaded.is_none()
|
||||
|| matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })),
|
||||
"assertion failure, inserted wrong timeline in an incorrect state"
|
||||
);
|
||||
Some(timeline)
|
||||
}
|
||||
LayeredTimelineEntry::Unloaded { .. } => {}
|
||||
},
|
||||
None => {
|
||||
debug!("timeline {timeline_id} not found");
|
||||
return Ok(None);
|
||||
None
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"timeline {timeline_id} found on a local disk, but not loaded into the memory, loading"
|
||||
);
|
||||
let timeline = self.load_local_timeline(timeline_id, timelines)?;
|
||||
let was_loaded = timelines.insert(
|
||||
timeline_id,
|
||||
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
|
||||
);
|
||||
ensure!(
|
||||
was_loaded.is_none()
|
||||
|| matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })),
|
||||
"assertion failure, inserted wrong timeline in an incorrect state"
|
||||
);
|
||||
Ok(Some(timeline))
|
||||
})
|
||||
}
|
||||
|
||||
fn load_local_timeline(
|
||||
|
||||
@@ -5,17 +5,17 @@ use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use once_cell::sync::Lazy;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use tracing::*;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
|
||||
use std::sync::{mpsc, Arc, Mutex, MutexGuard, RwLock, TryLockError};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{fs, thread};
|
||||
|
||||
use metrics::{
|
||||
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
|
||||
@@ -137,13 +137,13 @@ static CURRENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static CURRENT_LOGICAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_current_logical_size",
|
||||
"Current logical size grouped by timeline",
|
||||
&["tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
.expect("failed to define current logical size metric")
|
||||
});
|
||||
|
||||
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
|
||||
@@ -242,7 +242,7 @@ struct TimelineMetrics {
|
||||
pub wait_lsn_time_histo: Histogram,
|
||||
pub current_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: IntGauge,
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
}
|
||||
|
||||
impl TimelineMetrics {
|
||||
@@ -389,6 +389,37 @@ pub struct Timeline {
|
||||
repartition_threshold: u64,
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
// TODO task management should be done outside timeline, managed along with other tasks.
|
||||
#[allow(clippy::type_complexity)]
|
||||
initial_size_computation_task:
|
||||
Mutex<Option<(thread::JoinHandle<anyhow::Result<()>>, mpsc::Receiver<()>)>>,
|
||||
|
||||
/// Information about the last processed message by the WAL receiver,
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
/// yet.
|
||||
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
|
||||
|
||||
/// Relation size cache
|
||||
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
/// Calculation consists of two parts:
|
||||
/// 1. Initial size calculation. That might take a long time, because it requires
|
||||
/// reading all layers containing relation sizes up to the `initial_part_end`.
|
||||
/// 2. Collecting an incremental part and adding that to the initial size.
|
||||
/// Increments are appended on walreceiver writing new timeline data,
|
||||
/// which result in increase or decrease of the logical size.
|
||||
struct LogicalSize {
|
||||
/// Size, potentially slow to compute, derived from all layers located locally on this node's FS.
|
||||
/// Might require reading multiple layers, and even ancestor's layers, to collect the size.
|
||||
///
|
||||
/// NOTE: initial size is not a constant and will change between restarts.
|
||||
initial_logical_size: OnceCell<u64>,
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
initial_part_end: Option<Lsn>,
|
||||
/// All other size changes after startup, combined together.
|
||||
///
|
||||
/// Size shouldn't ever be negative, but this is signed for two reasons:
|
||||
///
|
||||
@@ -407,22 +438,82 @@ pub struct Timeline {
|
||||
///
|
||||
/// Note that we also expose a copy of this value as a prometheus metric,
|
||||
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
|
||||
/// and `set_current_logical_size` functions to modify this, they will
|
||||
/// also keep the prometheus metric in sync.
|
||||
current_logical_size: AtomicI64,
|
||||
// TODO we don't have a good, API to ensure on a compilation level
|
||||
// that the timeline passes all initialization.
|
||||
// Hence we ensure that we init at least once for every timeline
|
||||
// and keep this flag to avoid potentually long recomputes.
|
||||
logical_size_initialized: AtomicBool,
|
||||
/// to modify this, it will also keep the prometheus metric in sync.
|
||||
size_added_after_initial: AtomicI64,
|
||||
}
|
||||
|
||||
/// Information about the last processed message by the WAL receiver,
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
/// yet.
|
||||
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
|
||||
/// Normalized current size, that the data in pageserver occupies.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum CurrentLogicalSize {
|
||||
/// The size is not yet calculated to the end, this is an intermediate result,
|
||||
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
|
||||
/// yet total logical size cannot be below 0.
|
||||
Approximate(u64),
|
||||
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
|
||||
// available for observation without any calculations.
|
||||
Exact(u64),
|
||||
}
|
||||
|
||||
/// Relation size cache
|
||||
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
impl CurrentLogicalSize {
|
||||
fn size(&self) -> u64 {
|
||||
*match self {
|
||||
Self::Approximate(size) => size,
|
||||
Self::Exact(size) => size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalSize {
|
||||
fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value(0),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
|
||||
let size_increment = self.size_added_after_initial.load(AtomicOrdering::Acquire);
|
||||
match self.initial_logical_size.get() {
|
||||
Some(initial_size) => {
|
||||
let absolute_size_increment = u64::try_from(
|
||||
size_increment
|
||||
.checked_abs()
|
||||
.with_context(|| format!("Size added after initial {size_increment} is not expected to be i64::MIN"))?,
|
||||
).with_context(|| format!("Failed to convert size increment {size_increment} to u64"))?;
|
||||
|
||||
if size_increment < 0 {
|
||||
initial_size.checked_sub(absolute_size_increment)
|
||||
} else {
|
||||
initial_size.checked_add(absolute_size_increment)
|
||||
}.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
||||
.map(CurrentLogicalSize::Exact)
|
||||
}
|
||||
None => {
|
||||
let non_negative_size_increment = size_increment.max(0);
|
||||
u64::try_from(non_negative_size_increment)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to convert size increment {non_negative_size_increment} to u64"
|
||||
)
|
||||
})
|
||||
.map(CurrentLogicalSize::Approximate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn increment_size(&self, delta: i64) {
|
||||
self.size_added_after_initial
|
||||
.fetch_add(delta, AtomicOrdering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -491,7 +582,9 @@ impl Timeline {
|
||||
/// the Repository implementation may incorrectly return a value from an ancestor
|
||||
/// branch, for example, or waste a lot of cycles chasing the non-existing key.
|
||||
///
|
||||
pub fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get(&self, key: Key, lsn: Lsn) -> anyhow::Result<Bytes> {
|
||||
anyhow::ensure!(lsn.is_valid(), "Invalid LSN");
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
@@ -694,6 +787,8 @@ impl Timeline {
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
upload_layers: bool,
|
||||
) -> Timeline {
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
|
||||
let mut result = Timeline {
|
||||
conf,
|
||||
tenant_conf,
|
||||
@@ -705,12 +800,12 @@ impl Timeline {
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
last: metadata.disk_consistent_lsn(),
|
||||
last: disk_consistent_lsn,
|
||||
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
|
||||
}),
|
||||
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
|
||||
last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
last_freeze_ts: RwLock::new(Instant::now()),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
@@ -733,8 +828,16 @@ impl Timeline {
|
||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
|
||||
current_logical_size: AtomicI64::new(0),
|
||||
logical_size_initialized: AtomicBool::new(false),
|
||||
current_logical_size: if disk_consistent_lsn.is_valid() {
|
||||
// we're creating timeline data with some layer files existing locally,
|
||||
// need to recalculate timeline's logical size based on data in the layers.
|
||||
LogicalSize::deferred_initial(disk_consistent_lsn)
|
||||
} else {
|
||||
// we're creating timeline data without any layers existing locally,
|
||||
// initial logical size is 0.
|
||||
LogicalSize::empty_initial()
|
||||
},
|
||||
initial_size_computation_task: Mutex::new(None),
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
|
||||
@@ -835,92 +938,114 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// (Re-)calculate the logical size of the database at the latest LSN.
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// This can be a slow operation.
|
||||
pub fn init_logical_size(&self) -> Result<()> {
|
||||
if self.logical_size_initialized.load(AtomicOrdering::Acquire) {
|
||||
return Ok(());
|
||||
}
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
// Try a fast-path first:
|
||||
// Copy logical size from ancestor timeline if there has been no changes on this
|
||||
// branch, and no changes on the ancestor branch since the branch point.
|
||||
if self.get_ancestor_lsn() == self.get_last_record_lsn() && self.ancestor_timeline.is_some()
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
let ancestor = self.get_ancestor_timeline()?;
|
||||
let ancestor_logical_size = ancestor.get_current_logical_size();
|
||||
// Check LSN after getting logical size to exclude race condition
|
||||
// when ancestor timeline is concurrently updated.
|
||||
//
|
||||
// Logical size 0 means that it was not initialized, so don't believe that.
|
||||
if ancestor_logical_size != 0 && ancestor.get_last_record_lsn() == self.ancestor_lsn {
|
||||
self.set_current_logical_size(ancestor_logical_size);
|
||||
debug!(
|
||||
"logical size copied from ancestor: {}",
|
||||
ancestor_logical_size
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
let timer = self.metrics.init_logical_size_histo.start_timer();
|
||||
|
||||
// Have to calculate it the hard way
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?;
|
||||
self.set_current_logical_size(logical_size);
|
||||
debug!("calculated logical size the hard way: {}", logical_size);
|
||||
|
||||
timer.stop_and_record();
|
||||
Ok(())
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors.
|
||||
pub fn get_current_logical_size(&self) -> u64 {
|
||||
let current_logical_size = self.current_logical_size.load(AtomicOrdering::Acquire);
|
||||
match u64::try_from(current_logical_size) {
|
||||
Ok(sz) => sz,
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
let timeline_id = self.timeline_id;
|
||||
|
||||
let mut task_guard = match self.initial_size_computation_task.try_lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
error!(
|
||||
"current_logical_size is out of range: {}",
|
||||
current_logical_size
|
||||
);
|
||||
0
|
||||
debug!("Skipping timeline logical size init: task lock is taken already");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((old_task, task_finish_signal)) = task_guard.take() {
|
||||
// TODO rust 1.61 would allow to remove `task_finish_signal` entirely and call `old_task.is_finished()` instead
|
||||
match task_finish_signal.try_recv() {
|
||||
// task has either signaled successfully that it finished or panicked and dropped the sender part without signalling
|
||||
Ok(()) | Err(mpsc::TryRecvError::Disconnected) => {
|
||||
match old_task.join() {
|
||||
// we're here due to OnceCell::get not returning the value
|
||||
Ok(Ok(())) => {
|
||||
error!("Timeline {timeline_id} size init task finished, yet the size was not updated, rescheduling the computation")
|
||||
}
|
||||
Ok(Err(task_error)) => {
|
||||
error!("Error during timeline {timeline_id} size init: {task_error:?}")
|
||||
}
|
||||
Err(e) => error!("Timeline {timeline_id} size init task panicked: {e:?}"),
|
||||
}
|
||||
}
|
||||
// task had not yet finished: no signal was sent and the sender channel is not dropped
|
||||
Err(mpsc::TryRecvError::Empty) => {
|
||||
// let the task finish
|
||||
*task_guard = Some((old_task, task_finish_signal));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if task_guard.is_none() {
|
||||
let thread_timeline = Arc::clone(self);
|
||||
let (finish_sender, finish_receiver) = mpsc::channel();
|
||||
|
||||
match thread::Builder::new()
|
||||
.name(format!(
|
||||
"Timeline {timeline_id} initial logical size calculation"
|
||||
))
|
||||
.spawn(move || {
|
||||
let _enter = info_span!("initial_logical_size_calculation", timeline = %timeline_id).entered();
|
||||
let calculated_size = thread_timeline.calculate_logical_size(init_lsn)?;
|
||||
match thread_timeline.current_logical_size.initial_logical_size.set(calculated_size) {
|
||||
Ok(()) => info!("Successfully calculated initial logical size"),
|
||||
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
|
||||
}
|
||||
|
||||
finish_sender.send(()).ok();
|
||||
Ok(())
|
||||
}) {
|
||||
Ok(guard) => *task_guard = Some((guard, finish_receiver)),
|
||||
Err(e) => error!("Failed to spawn timeline {timeline_id} size init task: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the logical size of the database at the latest LSN.
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors, this can be a slow operation.
|
||||
fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result<u64> {
|
||||
info!("Calculating logical size for timeline {}", self.timeline_id);
|
||||
let timer = self.metrics.init_logical_size_histo.start_timer();
|
||||
let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
Ok(logical_size)
|
||||
}
|
||||
|
||||
/// Update current logical size, adding `delta' to the old value.
|
||||
fn update_current_logical_size(&self, delta: i64) {
|
||||
let new_size = self
|
||||
.current_logical_size
|
||||
.fetch_add(delta, AtomicOrdering::SeqCst);
|
||||
let logical_size = &self.current_logical_size;
|
||||
logical_size.increment_size(delta);
|
||||
|
||||
// Also set the value in the prometheus gauge. Note that
|
||||
// there is a race condition here: if this is is called by two
|
||||
// threads concurrently, the prometheus gauge might be set to
|
||||
// one value while current_logical_size is set to the
|
||||
// other. Currently, only initialization and the WAL receiver
|
||||
// updates the logical size, and they don't run concurrently,
|
||||
// so it cannot happen. And even if it did, it wouldn't be
|
||||
// very serious, the metrics would just be slightly off until
|
||||
// the next update.
|
||||
self.metrics.current_logical_size_gauge.set(new_size);
|
||||
}
|
||||
|
||||
/// Set current logical size.
|
||||
fn set_current_logical_size(&self, new_size: u64) {
|
||||
self.current_logical_size
|
||||
.store(new_size as i64, AtomicOrdering::SeqCst);
|
||||
self.logical_size_initialized
|
||||
.store(true, AtomicOrdering::SeqCst);
|
||||
|
||||
// Also set the value in the prometheus gauge. Same race condition
|
||||
// here as in `update_current_logical_size`.
|
||||
self.metrics.current_logical_size_gauge.set(new_size as i64);
|
||||
// other.
|
||||
match logical_size.current_size() {
|
||||
Ok(new_current_size) => self
|
||||
.metrics
|
||||
.current_logical_size_gauge
|
||||
.set(new_current_size.size()),
|
||||
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1446,7 +1571,15 @@ impl Timeline {
|
||||
Ok(new_delta_path)
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> Result<()> {
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
|
||||
@@ -936,7 +936,7 @@ impl<'a> DatadirModification<'a> {
|
||||
result?;
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64);
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
self.pending_nblocks = 0;
|
||||
}
|
||||
|
||||
@@ -948,7 +948,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> Result<()> {
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -964,7 +964,7 @@ impl<'a> DatadirModification<'a> {
|
||||
writer.finish_write(lsn);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64);
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::http::models::TenantInfo;
|
||||
use crate::layered_repository::{load_metadata, Repository, Timeline};
|
||||
use crate::repository::RepositoryTimeline;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
|
||||
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
@@ -378,15 +377,7 @@ pub fn get_local_timeline_with_load(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let repository = get_repository_for_tenant(tenant_id)?;
|
||||
match repository.get_timeline(timeline_id) {
|
||||
Some(RepositoryTimeline::Loaded(loaded_timeline)) => {
|
||||
loaded_timeline.init_logical_size()?;
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
_ => load_local_timeline(&repository, timeline_id)
|
||||
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")),
|
||||
}
|
||||
get_repository_for_tenant(tenant_id)?.get_timeline_load(timeline_id)
|
||||
}
|
||||
|
||||
pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> {
|
||||
@@ -470,17 +461,6 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_local_timeline(
|
||||
repo: &Repository,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
|
||||
format!("Inmem timeline {timeline_id} not found in tenant's repository")
|
||||
})?;
|
||||
inmem_timeline.init_logical_size()?;
|
||||
Ok(inmem_timeline)
|
||||
}
|
||||
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
@@ -489,9 +469,11 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
|
||||
.iter()
|
||||
.map(|(id, tenant)| {
|
||||
let has_in_progress_downloads = remote_index
|
||||
.tenant_entry(id)
|
||||
.map(|entry| entry.has_in_progress_downloads());
|
||||
.tenant_entry(id)
|
||||
.map(|entry| entry.has_in_progress_downloads());
|
||||
|
||||
// TODO this is not correct when we might have remote storage sync disabled:
|
||||
// we keep `RemoteTimelineIndex` in memory anyway for simplicity and this error message is printed still
|
||||
if has_in_progress_downloads.is_none() {
|
||||
error!("timeline is not found in remote index while it is present in the tenants registry")
|
||||
}
|
||||
@@ -581,7 +563,7 @@ fn attach_downloaded_tenant(
|
||||
|
||||
// and then load its layers in memory
|
||||
for timeline_id in downloaded_timelines {
|
||||
let _ = load_local_timeline(repo, timeline_id).with_context(|| {
|
||||
repo.get_timeline_load(timeline_id).with_context(|| {
|
||||
format!(
|
||||
"Failed to register add local timeline for tenant {}",
|
||||
repo.tenant_id(),
|
||||
|
||||
@@ -315,18 +315,20 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
// Send zenith feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let zenith_status_update = ReplicationFeedback {
|
||||
current_timeline_size: timeline.get_current_logical_size() as u64,
|
||||
let status_update = ReplicationFeedback {
|
||||
current_timeline_size: timeline
|
||||
.get_current_logical_size()
|
||||
.context("Status update creation failed to get current logical size")?,
|
||||
ps_writelsn: write_lsn,
|
||||
ps_flushlsn: flush_lsn,
|
||||
ps_applylsn: apply_lsn,
|
||||
ps_replytime: ts,
|
||||
};
|
||||
|
||||
debug!("zenith_status_update {zenith_status_update:?}");
|
||||
debug!("zenith_status_update {status_update:?}");
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
zenith_status_update.serialize(&mut data)?;
|
||||
status_update.serialize(&mut data)?;
|
||||
physical_stream
|
||||
.as_mut()
|
||||
.zenith_status_update(data.len() as u64, &data)
|
||||
|
||||
@@ -67,11 +67,21 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
|
||||
|
||||
# But all others are broken
|
||||
for n in range(1, 4):
|
||||
(tenant, timeline, pg) = tenant_timelines[n]
|
||||
with pytest.raises(Exception, match="Cannot load local timeline") as err:
|
||||
|
||||
# First timeline would fail instantly due to corrupt metadata file
|
||||
(_tenant, _timeline, pg) = tenant_timelines[1]
|
||||
with pytest.raises(Exception, match="Cannot load local timeline") as err:
|
||||
pg.start()
|
||||
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
|
||||
|
||||
# Yet other timelines will fail when their layers will be queried during basebackup: we don't check layer file contents on startup, when loading the timeline
|
||||
for n in range(2, 4):
|
||||
(_tenant, _timeline, pg) = tenant_timelines[n]
|
||||
with pytest.raises(Exception, match="extracting base backup failed") as err:
|
||||
pg.start()
|
||||
log.info(f"compute startup failed as expected: {err}")
|
||||
log.info(
|
||||
f"compute startup failed lazily for timeline with corrupt layers, during basebackup preparation: {err}"
|
||||
)
|
||||
|
||||
|
||||
def test_create_multiple_timelines_parallel(neon_simple_env: NeonEnv):
|
||||
|
||||
@@ -10,6 +10,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserverHttpClient,
|
||||
Postgres,
|
||||
assert_timeline_local,
|
||||
wait_for_last_flush_lsn,
|
||||
@@ -23,11 +24,7 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
|
||||
assert (
|
||||
timeline_details["local"]["current_logical_size"]
|
||||
== timeline_details["local"]["current_logical_size_non_incremental"]
|
||||
)
|
||||
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
|
||||
|
||||
pgmain = env.postgres.create_start("test_timeline_size")
|
||||
log.info("postgres is running on 'test_timeline_size' branch")
|
||||
@@ -61,17 +58,14 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
|
||||
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty")
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "empty")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
|
||||
timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
|
||||
assert (
|
||||
timeline_details["local"]["current_logical_size"]
|
||||
== timeline_details["local"]["current_logical_size_non_incremental"]
|
||||
)
|
||||
|
||||
pgmain = env.postgres.create_start("test_timeline_size")
|
||||
log.info("postgres is running on 'test_timeline_size' branch")
|
||||
pgmain = env.postgres.create_start("test_timeline_size_createdropdb")
|
||||
log.info("postgres is running on 'test_timeline_size_createdropdb' branch")
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -81,6 +75,10 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
|
||||
local_details["current_logical_size"]
|
||||
== local_details["current_logical_size_non_incremental"]
|
||||
)
|
||||
assert (
|
||||
timeline_details["local"]["current_logical_size_non_incremental"]
|
||||
== local_details["current_logical_size_non_incremental"]
|
||||
), "no writes should not change the incremental logical size"
|
||||
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
with closing(pgmain.connect(dbname="foodb")) as conn:
|
||||
@@ -140,13 +138,10 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
|
||||
|
||||
def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_quota")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
|
||||
assert (
|
||||
res["local"]["current_logical_size"] == res["local"]["current_logical_size_non_incremental"]
|
||||
)
|
||||
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
|
||||
|
||||
pgmain = env.postgres.create_start(
|
||||
"test_timeline_size_quota",
|
||||
@@ -211,6 +206,12 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
pg_cluster_size = cur.fetchone()
|
||||
log.info(f"pg_cluster_size = {pg_cluster_size}")
|
||||
|
||||
new_res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
|
||||
assert (
|
||||
new_res["local"]["current_logical_size"]
|
||||
== new_res["local"]["current_logical_size_non_incremental"]
|
||||
), "after the WAL is streamed, current_logical_size is expected to be calculated and to be equal its non-incremental value"
|
||||
|
||||
|
||||
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
@@ -425,3 +426,22 @@ def assert_physical_size(env: NeonEnv, tenant_id: ZTenantId, timeline_id: ZTimel
|
||||
== res["local"]["current_physical_size_non_incremental"]
|
||||
)
|
||||
assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path)
|
||||
|
||||
|
||||
# Timeline logical size initialization is an asynchronous background task that runs once,
|
||||
# try a few times to ensure it's activated properly
|
||||
def wait_for_timeline_size_init(
|
||||
client: NeonPageserverHttpClient, tenant: ZTenantId, timeline: ZTimelineId
|
||||
):
|
||||
for i in range(10):
|
||||
timeline_details = assert_timeline_local(client, tenant, timeline)
|
||||
if (
|
||||
timeline_details["local"]["current_logical_size"]
|
||||
== timeline_details["local"]["current_logical_size_non_incremental"]
|
||||
):
|
||||
return
|
||||
log.info(f"waiting for current_logical_size of a timeline to be calculated, iteration {i}")
|
||||
time.sleep(1)
|
||||
raise Exception(
|
||||
f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user