diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ef18129504..710014de98 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -75,7 +75,7 @@ fn get_config(request: &Request) -> &'static PageServerConf { // Helper functions to construct a LocalTimelineInfo struct for a timeline fn local_timeline_info_from_loaded_timeline( - timeline: &Timeline, + timeline: &Arc, include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, ) -> anyhow::Result { @@ -106,7 +106,11 @@ fn local_timeline_info_from_loaded_timeline( prev_record_lsn: Some(timeline.get_prev_record_lsn()), latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Loaded, - current_logical_size: Some(timeline.get_current_logical_size()), + current_logical_size: Some( + timeline + .get_current_logical_size() + .context("Timeline info creation failed to get current logical size")?, + ), current_physical_size: Some(timeline.get_physical_size()), current_logical_size_non_incremental: if include_non_incremental_logical_size { Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?) @@ -212,7 +216,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. - let local_info = local_timeline_info_from_loaded_timeline(new_timeline.as_ref(), false, false)?; + let local_info = local_timeline_info_from_loaded_timeline(&new_timeline, false, false)?; Ok(Some(TimelineInfo { tenant_id, timeline_id: new_timeline_id, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 73c30b51b8..9d405b0033 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -136,14 +136,11 @@ impl Repository { } /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. - pub fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { + pub fn get_timeline_load(&self, timeline_id: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); - match self.get_timeline_load_internal(timelineid, &mut timelines)? { + match self.get_timeline_load_internal(timeline_id, &mut timelines)? { Some(local_loaded_timeline) => Ok(local_loaded_timeline), - None => anyhow::bail!( - "cannot get local timeline: unknown timeline id: {}", - timelineid - ), + None => anyhow::bail!("cannot get local timeline, unknown timeline id: {timeline_id}"), } } @@ -559,33 +556,34 @@ impl Repository { timeline_id: ZTimelineId, timelines: &mut HashMap, ) -> anyhow::Result>> { - match timelines.get(&timeline_id) { + Ok(match timelines.get(&timeline_id) { Some(entry) => match entry { LayeredTimelineEntry::Loaded(local_timeline) => { debug!("timeline {timeline_id} found loaded into memory"); - return Ok(Some(Arc::clone(local_timeline))); + Some(Arc::clone(local_timeline)) + } + LayeredTimelineEntry::Unloaded { .. } => { + debug!( + "timeline {timeline_id} found on a local disk, but not loaded into the memory, loading" + ); + let timeline = self.load_local_timeline(timeline_id, timelines)?; + let was_loaded = timelines.insert( + timeline_id, + LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), + ); + ensure!( + was_loaded.is_none() + || matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })), + "assertion failure, inserted wrong timeline in an incorrect state" + ); + Some(timeline) } - LayeredTimelineEntry::Unloaded { .. } => {} }, None => { debug!("timeline {timeline_id} not found"); - return Ok(None); + None } - }; - debug!( - "timeline {timeline_id} found on a local disk, but not loaded into the memory, loading" - ); - let timeline = self.load_local_timeline(timeline_id, timelines)?; - let was_loaded = timelines.insert( - timeline_id, - LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), - ); - ensure!( - was_loaded.is_none() - || matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })), - "assertion failure, inserted wrong timeline in an incorrect state" - ); - Ok(Some(timeline)) + }) } fn load_local_timeline( diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 8b90cc4e6b..fd719812a3 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -5,17 +5,17 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use metrics::core::{AtomicU64, GenericCounter}; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use tracing::*; use std::cmp::{max, min, Ordering}; use std::collections::{HashMap, HashSet}; -use std::fs; use std::ops::{Deref, Range}; use std::path::PathBuf; use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::time::{Duration, Instant, SystemTime}; +use std::{fs, thread}; use metrics::{ register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, @@ -137,13 +137,13 @@ static CURRENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -static CURRENT_LOGICAL_SIZE: Lazy = Lazy::new(|| { - register_int_gauge_vec!( +static CURRENT_LOGICAL_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( "pageserver_current_logical_size", "Current logical size grouped by timeline", &["tenant_id", "timeline_id"] ) - .expect("failed to define a metric") + .expect("failed to define current logical size metric") }); // Metrics for cloud upload. These metrics reflect data uploaded to cloud storage, @@ -242,7 +242,7 @@ struct TimelineMetrics { pub wait_lsn_time_histo: Histogram, pub current_physical_size_gauge: UIntGauge, /// copy of LayeredTimeline.current_logical_size - pub current_logical_size_gauge: IntGauge, + pub current_logical_size_gauge: UIntGauge, } impl TimelineMetrics { @@ -389,6 +389,37 @@ pub struct Timeline { repartition_threshold: u64, /// Current logical size of the "datadir", at the last LSN. + current_logical_size: LogicalSize, + // TODO task management should be done outside timeline, managed along with other tasks. + #[allow(clippy::type_complexity)] + initial_size_computation_task: + Mutex>, mpsc::Receiver<()>)>>, + + /// Information about the last processed message by the WAL receiver, + /// or None if WAL receiver has not received anything for this timeline + /// yet. + pub last_received_wal: Mutex>, + + /// Relation size cache + pub rel_size_cache: RwLock>, +} + +/// Internal structure to hold all data needed for logical size calculation. +/// Calculation consists of two parts: +/// 1. Initial size calculation. That might take a long time, because it requires +/// reading all layers containing relation sizes up to the `initial_part_end`. +/// 2. Collecting an incremental part and adding that to the initial size. +/// Increments are appended on walreceiver writing new timeline data, +/// which result in increase or decrease of the logical size. +struct LogicalSize { + /// Size, potentially slow to compute, derived from all layers located locally on this node's FS. + /// Might require reading multiple layers, and even ancestor's layers, to collect the size. + /// + /// NOTE: initial size is not a constant and will change between restarts. + initial_logical_size: OnceCell, + /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. + initial_part_end: Option, + /// All other size changes after startup, combined together. /// /// Size shouldn't ever be negative, but this is signed for two reasons: /// @@ -407,22 +438,82 @@ pub struct Timeline { /// /// Note that we also expose a copy of this value as a prometheus metric, /// see `current_logical_size_gauge`. Use the `update_current_logical_size` - /// and `set_current_logical_size` functions to modify this, they will - /// also keep the prometheus metric in sync. - current_logical_size: AtomicI64, - // TODO we don't have a good, API to ensure on a compilation level - // that the timeline passes all initialization. - // Hence we ensure that we init at least once for every timeline - // and keep this flag to avoid potentually long recomputes. - logical_size_initialized: AtomicBool, + /// to modify this, it will also keep the prometheus metric in sync. + size_added_after_initial: AtomicI64, +} - /// Information about the last processed message by the WAL receiver, - /// or None if WAL receiver has not received anything for this timeline - /// yet. - pub last_received_wal: Mutex>, +/// Normalized current size, that the data in pageserver occupies. +#[derive(Debug, Clone, Copy)] +enum CurrentLogicalSize { + /// The size is not yet calculated to the end, this is an intermediate result, + /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative, + /// yet total logical size cannot be below 0. + Approximate(u64), + // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are + // available for observation without any calculations. + Exact(u64), +} - /// Relation size cache - pub rel_size_cache: RwLock>, +impl CurrentLogicalSize { + fn size(&self) -> u64 { + *match self { + Self::Approximate(size) => size, + Self::Exact(size) => size, + } + } +} + +impl LogicalSize { + fn empty_initial() -> Self { + Self { + initial_logical_size: OnceCell::with_value(0), + initial_part_end: None, + size_added_after_initial: AtomicI64::new(0), + } + } + + fn deferred_initial(compute_to: Lsn) -> Self { + Self { + initial_logical_size: OnceCell::new(), + initial_part_end: Some(compute_to), + size_added_after_initial: AtomicI64::new(0), + } + } + + fn current_size(&self) -> anyhow::Result { + let size_increment = self.size_added_after_initial.load(AtomicOrdering::Acquire); + match self.initial_logical_size.get() { + Some(initial_size) => { + let absolute_size_increment = u64::try_from( + size_increment + .checked_abs() + .with_context(|| format!("Size added after initial {size_increment} is not expected to be i64::MIN"))?, + ).with_context(|| format!("Failed to convert size increment {size_increment} to u64"))?; + + if size_increment < 0 { + initial_size.checked_sub(absolute_size_increment) + } else { + initial_size.checked_add(absolute_size_increment) + }.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}")) + .map(CurrentLogicalSize::Exact) + } + None => { + let non_negative_size_increment = size_increment.max(0); + u64::try_from(non_negative_size_increment) + .with_context(|| { + format!( + "Failed to convert size increment {non_negative_size_increment} to u64" + ) + }) + .map(CurrentLogicalSize::Approximate) + } + } + } + + fn increment_size(&self, delta: i64) { + self.size_added_after_initial + .fetch_add(delta, AtomicOrdering::SeqCst); + } } pub struct WalReceiverInfo { @@ -491,7 +582,9 @@ impl Timeline { /// the Repository implementation may incorrectly return a value from an ancestor /// branch, for example, or waste a lot of cycles chasing the non-existing key. /// - pub fn get(&self, key: Key, lsn: Lsn) -> Result { + pub fn get(&self, key: Key, lsn: Lsn) -> anyhow::Result { + anyhow::ensure!(lsn.is_valid(), "Invalid LSN"); + // Check the page cache. We will get back the most recent page with lsn <= `lsn`. // The cached image can be returned directly if there is no WAL between the cached image // and requested LSN. The cached image can also be used to reduce the amount of WAL needed @@ -694,6 +787,8 @@ impl Timeline { walredo_mgr: Arc, upload_layers: bool, ) -> Timeline { + let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let mut result = Timeline { conf, tenant_conf, @@ -705,12 +800,12 @@ impl Timeline { // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { - last: metadata.disk_consistent_lsn(), + last: disk_consistent_lsn, prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)), }), - disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0), + disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0), - last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0), + last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0), last_freeze_ts: RwLock::new(Instant::now()), ancestor_timeline: ancestor, @@ -733,8 +828,16 @@ impl Timeline { latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), - current_logical_size: AtomicI64::new(0), - logical_size_initialized: AtomicBool::new(false), + current_logical_size: if disk_consistent_lsn.is_valid() { + // we're creating timeline data with some layer files existing locally, + // need to recalculate timeline's logical size based on data in the layers. + LogicalSize::deferred_initial(disk_consistent_lsn) + } else { + // we're creating timeline data without any layers existing locally, + // initial logical size is 0. + LogicalSize::empty_initial() + }, + initial_size_computation_task: Mutex::new(None), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -835,92 +938,114 @@ impl Timeline { Ok(()) } - /// (Re-)calculate the logical size of the database at the latest LSN. + /// Retrieve current logical size of the timeline. /// - /// This can be a slow operation. - pub fn init_logical_size(&self) -> Result<()> { - if self.logical_size_initialized.load(AtomicOrdering::Acquire) { - return Ok(()); - } + /// The size could be lagging behind the actual number, in case + /// the initial size calculation has not been run (gets triggered on the first size access). + pub fn get_current_logical_size(self: &Arc) -> anyhow::Result { + let current_size = self.current_logical_size.current_size()?; + debug!("Current size: {current_size:?}"); - // Try a fast-path first: - // Copy logical size from ancestor timeline if there has been no changes on this - // branch, and no changes on the ancestor branch since the branch point. - if self.get_ancestor_lsn() == self.get_last_record_lsn() && self.ancestor_timeline.is_some() + let size = current_size.size(); + if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) = + (current_size, self.current_logical_size.initial_part_end) { - let ancestor = self.get_ancestor_timeline()?; - let ancestor_logical_size = ancestor.get_current_logical_size(); - // Check LSN after getting logical size to exclude race condition - // when ancestor timeline is concurrently updated. - // - // Logical size 0 means that it was not initialized, so don't believe that. - if ancestor_logical_size != 0 && ancestor.get_last_record_lsn() == self.ancestor_lsn { - self.set_current_logical_size(ancestor_logical_size); - debug!( - "logical size copied from ancestor: {}", - ancestor_logical_size - ); - return Ok(()); - } + self.try_spawn_size_init_task(init_lsn); } - let timer = self.metrics.init_logical_size_histo.start_timer(); - - // Have to calculate it the hard way - let last_lsn = self.get_last_record_lsn(); - let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?; - self.set_current_logical_size(logical_size); - debug!("calculated logical size the hard way: {}", logical_size); - - timer.stop_and_record(); - Ok(()) + Ok(size) } - /// Retrieve current logical size of the timeline - /// - /// NOTE: counted incrementally, includes ancestors. - pub fn get_current_logical_size(&self) -> u64 { - let current_logical_size = self.current_logical_size.load(AtomicOrdering::Acquire); - match u64::try_from(current_logical_size) { - Ok(sz) => sz, + fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { + let timeline_id = self.timeline_id; + + let mut task_guard = match self.initial_size_computation_task.try_lock() { + Ok(guard) => guard, Err(_) => { - error!( - "current_logical_size is out of range: {}", - current_logical_size - ); - 0 + debug!("Skipping timeline logical size init: task lock is taken already"); + return; + } + }; + + if let Some((old_task, task_finish_signal)) = task_guard.take() { + // TODO rust 1.61 would allow to remove `task_finish_signal` entirely and call `old_task.is_finished()` instead + match task_finish_signal.try_recv() { + // task has either signaled successfully that it finished or panicked and dropped the sender part without signalling + Ok(()) | Err(mpsc::TryRecvError::Disconnected) => { + match old_task.join() { + // we're here due to OnceCell::get not returning the value + Ok(Ok(())) => { + error!("Timeline {timeline_id} size init task finished, yet the size was not updated, rescheduling the computation") + } + Ok(Err(task_error)) => { + error!("Error during timeline {timeline_id} size init: {task_error:?}") + } + Err(e) => error!("Timeline {timeline_id} size init task panicked: {e:?}"), + } + } + // task had not yet finished: no signal was sent and the sender channel is not dropped + Err(mpsc::TryRecvError::Empty) => { + // let the task finish + *task_guard = Some((old_task, task_finish_signal)); + return; + } } } + + if task_guard.is_none() { + let thread_timeline = Arc::clone(self); + let (finish_sender, finish_receiver) = mpsc::channel(); + + match thread::Builder::new() + .name(format!( + "Timeline {timeline_id} initial logical size calculation" + )) + .spawn(move || { + let _enter = info_span!("initial_logical_size_calculation", timeline = %timeline_id).entered(); + let calculated_size = thread_timeline.calculate_logical_size(init_lsn)?; + match thread_timeline.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => info!("Successfully calculated initial logical size"), + Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + } + + finish_sender.send(()).ok(); + Ok(()) + }) { + Ok(guard) => *task_guard = Some((guard, finish_receiver)), + Err(e) => error!("Failed to spawn timeline {timeline_id} size init task: {e}"), + } + } + } + + /// Calculate the logical size of the database at the latest LSN. + /// + /// NOTE: counted incrementally, includes ancestors, this can be a slow operation. + fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result { + info!("Calculating logical size for timeline {}", self.timeline_id); + let timer = self.metrics.init_logical_size_histo.start_timer(); + let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?; + debug!("calculated logical size: {logical_size}"); + timer.stop_and_record(); + Ok(logical_size) } /// Update current logical size, adding `delta' to the old value. fn update_current_logical_size(&self, delta: i64) { - let new_size = self - .current_logical_size - .fetch_add(delta, AtomicOrdering::SeqCst); + let logical_size = &self.current_logical_size; + logical_size.increment_size(delta); // Also set the value in the prometheus gauge. Note that // there is a race condition here: if this is is called by two // threads concurrently, the prometheus gauge might be set to // one value while current_logical_size is set to the - // other. Currently, only initialization and the WAL receiver - // updates the logical size, and they don't run concurrently, - // so it cannot happen. And even if it did, it wouldn't be - // very serious, the metrics would just be slightly off until - // the next update. - self.metrics.current_logical_size_gauge.set(new_size); - } - - /// Set current logical size. - fn set_current_logical_size(&self, new_size: u64) { - self.current_logical_size - .store(new_size as i64, AtomicOrdering::SeqCst); - self.logical_size_initialized - .store(true, AtomicOrdering::SeqCst); - - // Also set the value in the prometheus gauge. Same race condition - // here as in `update_current_logical_size`. - self.metrics.current_logical_size_gauge.set(new_size as i64); + // other. + match logical_size.current_size() { + Ok(new_current_size) => self + .metrics + .current_logical_size_gauge + .set(new_current_size.size()), + Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"), + } } /// @@ -1446,7 +1571,15 @@ impl Timeline { Ok(new_delta_path) } - pub fn compact(&self) -> Result<()> { + pub fn compact(&self) -> anyhow::Result<()> { + let last_record_lsn = self.get_last_record_lsn(); + + // Last record Lsn could be zero in case the timelie was just created + if !last_record_lsn.is_valid() { + warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}"); + return Ok(()); + } + // // High level strategy for compaction / image creation: // diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0f0bb1ed53..24002a36e5 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -936,7 +936,7 @@ impl<'a> DatadirModification<'a> { result?; if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64); + writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); self.pending_nblocks = 0; } @@ -948,7 +948,7 @@ impl<'a> DatadirModification<'a> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub fn commit(&mut self) -> Result<()> { + pub fn commit(&mut self) -> anyhow::Result<()> { let writer = self.tline.writer(); let lsn = self.lsn; let pending_nblocks = self.pending_nblocks; @@ -964,7 +964,7 @@ impl<'a> DatadirModification<'a> { writer.finish_write(lsn); if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * BLCKSZ as i64); + writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); } Ok(()) diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 4a907ac0e1..fec8a80b9b 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -4,7 +4,6 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; use crate::layered_repository::{load_metadata, Repository, Timeline}; -use crate::repository::RepositoryTimeline; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; @@ -378,15 +377,7 @@ pub fn get_local_timeline_with_load( tenant_id: ZTenantId, timeline_id: ZTimelineId, ) -> anyhow::Result> { - let repository = get_repository_for_tenant(tenant_id)?; - match repository.get_timeline(timeline_id) { - Some(RepositoryTimeline::Loaded(loaded_timeline)) => { - loaded_timeline.init_logical_size()?; - Ok(loaded_timeline) - } - _ => load_local_timeline(&repository, timeline_id) - .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")), - } + get_repository_for_tenant(tenant_id)?.get_timeline_load(timeline_id) } pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> { @@ -470,17 +461,6 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any Ok(()) } -fn load_local_timeline( - repo: &Repository, - timeline_id: ZTimelineId, -) -> anyhow::Result> { - let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| { - format!("Inmem timeline {timeline_id} not found in tenant's repository") - })?; - inmem_timeline.init_logical_size()?; - Ok(inmem_timeline) -} - /// /// Get list of tenants, for the mgmt API /// @@ -489,9 +469,11 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec { .iter() .map(|(id, tenant)| { let has_in_progress_downloads = remote_index - .tenant_entry(id) - .map(|entry| entry.has_in_progress_downloads()); + .tenant_entry(id) + .map(|entry| entry.has_in_progress_downloads()); + // TODO this is not correct when we might have remote storage sync disabled: + // we keep `RemoteTimelineIndex` in memory anyway for simplicity and this error message is printed still if has_in_progress_downloads.is_none() { error!("timeline is not found in remote index while it is present in the tenants registry") } @@ -581,7 +563,7 @@ fn attach_downloaded_tenant( // and then load its layers in memory for timeline_id in downloaded_timelines { - let _ = load_local_timeline(repo, timeline_id).with_context(|| { + repo.get_timeline_load(timeline_id).with_context(|| { format!( "Failed to register add local timeline for tenant {}", repo.tenant_id(), diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index f816198eda..2c29a56ad2 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -315,18 +315,20 @@ pub async fn handle_walreceiver_connection( // Send zenith feedback message. // Regular standby_status_update fields are put into this message. - let zenith_status_update = ReplicationFeedback { - current_timeline_size: timeline.get_current_logical_size() as u64, + let status_update = ReplicationFeedback { + current_timeline_size: timeline + .get_current_logical_size() + .context("Status update creation failed to get current logical size")?, ps_writelsn: write_lsn, ps_flushlsn: flush_lsn, ps_applylsn: apply_lsn, ps_replytime: ts, }; - debug!("zenith_status_update {zenith_status_update:?}"); + debug!("zenith_status_update {status_update:?}"); let mut data = BytesMut::new(); - zenith_status_update.serialize(&mut data)?; + status_update.serialize(&mut data)?; physical_stream .as_mut() .zenith_status_update(data.len() as u64, &data) diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index bf44dfd949..31b54f827b 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -67,11 +67,21 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 # But all others are broken - for n in range(1, 4): - (tenant, timeline, pg) = tenant_timelines[n] - with pytest.raises(Exception, match="Cannot load local timeline") as err: + + # First timeline would fail instantly due to corrupt metadata file + (_tenant, _timeline, pg) = tenant_timelines[1] + with pytest.raises(Exception, match="Cannot load local timeline") as err: + pg.start() + log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") + + # Yet other timelines will fail when their layers will be queried during basebackup: we don't check layer file contents on startup, when loading the timeline + for n in range(2, 4): + (_tenant, _timeline, pg) = tenant_timelines[n] + with pytest.raises(Exception, match="extracting base backup failed") as err: pg.start() - log.info(f"compute startup failed as expected: {err}") + log.info( + f"compute startup failed lazily for timeline with corrupt layers, during basebackup preparation: {err}" + ) def test_create_multiple_timelines_parallel(neon_simple_env: NeonEnv): diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index aba8567541..6fbc430e80 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -10,6 +10,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserverHttpClient, Postgres, assert_timeline_local, wait_for_last_flush_lsn, @@ -23,11 +24,7 @@ def test_timeline_size(neon_simple_env: NeonEnv): new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty") client = env.pageserver.http_client() - timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id) - assert ( - timeline_details["local"]["current_logical_size"] - == timeline_details["local"]["current_logical_size_non_incremental"] - ) + wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id) pgmain = env.postgres.create_start("test_timeline_size") log.info("postgres is running on 'test_timeline_size' branch") @@ -61,17 +58,14 @@ def test_timeline_size(neon_simple_env: NeonEnv): def test_timeline_size_createdropdb(neon_simple_env: NeonEnv): env = neon_simple_env - new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty") + new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "empty") client = env.pageserver.http_client() + wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id) timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id) - assert ( - timeline_details["local"]["current_logical_size"] - == timeline_details["local"]["current_logical_size_non_incremental"] - ) - pgmain = env.postgres.create_start("test_timeline_size") - log.info("postgres is running on 'test_timeline_size' branch") + pgmain = env.postgres.create_start("test_timeline_size_createdropdb") + log.info("postgres is running on 'test_timeline_size_createdropdb' branch") with closing(pgmain.connect()) as conn: with conn.cursor() as cur: @@ -81,6 +75,10 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv): local_details["current_logical_size"] == local_details["current_logical_size_non_incremental"] ) + assert ( + timeline_details["local"]["current_logical_size_non_incremental"] + == local_details["current_logical_size_non_incremental"] + ), "no writes should not change the incremental logical size" cur.execute("CREATE DATABASE foodb") with closing(pgmain.connect(dbname="foodb")) as conn: @@ -140,13 +138,10 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60 def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() + client = env.pageserver.http_client() new_timeline_id = env.neon_cli.create_branch("test_timeline_size_quota") - client = env.pageserver.http_client() - res = assert_timeline_local(client, env.initial_tenant, new_timeline_id) - assert ( - res["local"]["current_logical_size"] == res["local"]["current_logical_size_non_incremental"] - ) + wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id) pgmain = env.postgres.create_start( "test_timeline_size_quota", @@ -211,6 +206,12 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder): pg_cluster_size = cur.fetchone() log.info(f"pg_cluster_size = {pg_cluster_size}") + new_res = assert_timeline_local(client, env.initial_tenant, new_timeline_id) + assert ( + new_res["local"]["current_logical_size"] + == new_res["local"]["current_logical_size_non_incremental"] + ), "after the WAL is streamed, current_logical_size is expected to be calculated and to be equal its non-incremental value" + def test_timeline_physical_size_init(neon_simple_env: NeonEnv): env = neon_simple_env @@ -425,3 +426,22 @@ def assert_physical_size(env: NeonEnv, tenant_id: ZTenantId, timeline_id: ZTimel == res["local"]["current_physical_size_non_incremental"] ) assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path) + + +# Timeline logical size initialization is an asynchronous background task that runs once, +# try a few times to ensure it's activated properly +def wait_for_timeline_size_init( + client: NeonPageserverHttpClient, tenant: ZTenantId, timeline: ZTimelineId +): + for i in range(10): + timeline_details = assert_timeline_local(client, tenant, timeline) + if ( + timeline_details["local"]["current_logical_size"] + == timeline_details["local"]["current_logical_size_non_incremental"] + ): + return + log.info(f"waiting for current_logical_size of a timeline to be calculated, iteration {i}") + time.sleep(1) + raise Exception( + f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}" + )