diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7b5adc4ca6..7edcb9c249 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -124,7 +124,7 @@ pub(super) enum FlushLoopState { /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Hole { +pub(crate) struct Hole { key_range: Range, coverage_size: usize, } @@ -565,19 +565,19 @@ impl From for PageReconstructError { /// Public interface functions impl Timeline { /// Get the LSN where this branch was created - pub fn get_ancestor_lsn(&self) -> Lsn { + pub(crate) fn get_ancestor_lsn(&self) -> Lsn { self.ancestor_lsn } /// Get the ancestor's timeline id - pub fn get_ancestor_timeline_id(&self) -> Option { + pub(crate) fn get_ancestor_timeline_id(&self) -> Option { self.ancestor_timeline .as_ref() .map(|ancestor| ancestor.timeline_id) } /// Lock and get timeline's GC cutoff - pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard { + pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard { self.latest_gc_cutoff_lsn.read() } @@ -733,27 +733,27 @@ impl Timeline { } /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. - pub fn get_last_record_lsn(&self) -> Lsn { + pub(crate) fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last } - pub fn get_prev_record_lsn(&self) -> Lsn { + pub(crate) fn get_prev_record_lsn(&self) -> Lsn { self.last_record_lsn.load().prev } /// Atomically get both last and prev. - pub fn get_last_record_rlsn(&self) -> RecordLsn { + pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn { self.last_record_lsn.load() } - pub fn get_disk_consistent_lsn(&self) -> Lsn { + pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn { self.disk_consistent_lsn.load() } /// remote_consistent_lsn from the perspective of the tenant's current generation, /// not validated with control plane yet. /// See [`Self::get_remote_consistent_lsn_visible`]. - pub fn get_remote_consistent_lsn_projected(&self) -> Option { + pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option { if let Some(remote_client) = &self.remote_client { remote_client.remote_consistent_lsn_projected() } else { @@ -764,7 +764,7 @@ impl Timeline { /// remote_consistent_lsn which the tenant is guaranteed not to go backward from, /// i.e. a value of remote_consistent_lsn_projected which has undergone /// generation validation in the deletion queue. - pub fn get_remote_consistent_lsn_visible(&self) -> Option { + pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option { if let Some(remote_client) = &self.remote_client { remote_client.remote_consistent_lsn_visible() } else { @@ -775,7 +775,7 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub async fn layer_size_sum(&self) -> u64 { + pub(crate) async fn layer_size_sum(&self) -> u64 { let guard = self.layers.read().await; let layer_map = guard.layer_map(); let mut size = 0; @@ -785,7 +785,7 @@ impl Timeline { size } - pub fn resident_physical_size(&self) -> u64 { + pub(crate) fn resident_physical_size(&self) -> u64 { self.metrics.resident_physical_size_get() } @@ -861,7 +861,7 @@ impl Timeline { } /// Check that it is valid to request operations with that lsn. - pub fn check_lsn_is_in_scope( + pub(crate) fn check_lsn_is_in_scope( &self, lsn: Lsn, latest_gc_cutoff_lsn: &RcuReadGuard, @@ -877,7 +877,7 @@ impl Timeline { /// Flush to disk all data that was written with the put_* functions #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))] - pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { + pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> { self.freeze_inmem_layer(false).await; self.flush_frozen_layers_and_wait().await } @@ -1021,7 +1021,7 @@ impl Timeline { } /// Mutate the timeline with a [`TimelineWriter`]. - pub async fn writer(&self) -> TimelineWriter<'_> { + pub(crate) async fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { tl: self, _write_guard: self.write_lock.lock().await, @@ -1033,7 +1033,7 @@ impl Timeline { /// /// Also flush after a period of time without new data -- it helps /// safekeepers to regard pageserver as caught up and suspend activity. - pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { let guard = self.layers.read().await; @@ -1071,13 +1071,16 @@ impl Timeline { Ok(()) } - pub fn activate( + pub(crate) fn activate( self: &Arc, broker_client: BrokerClientChannel, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { - self.spawn_initial_logical_size_computation_task(ctx); + if self.tenant_shard_id.is_zero() { + // Logical size is only maintained accurately on shard zero. + self.spawn_initial_logical_size_computation_task(ctx); + } self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(background_jobs_can_start); @@ -1172,7 +1175,7 @@ impl Timeline { self.gate.close().await; } - pub fn set_state(&self, new_state: TimelineState) { + pub(crate) fn set_state(&self, new_state: TimelineState) { match (self.current_state(), new_state) { (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { info!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); @@ -1192,7 +1195,7 @@ impl Timeline { } } - pub fn set_broken(&self, reason: String) { + pub(crate) fn set_broken(&self, reason: String) { let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture()); let broken_state = TimelineState::Broken { reason, @@ -1206,27 +1209,27 @@ impl Timeline { self.cancel.cancel(); } - pub fn current_state(&self) -> TimelineState { + pub(crate) fn current_state(&self) -> TimelineState { self.state.borrow().clone() } - pub fn is_broken(&self) -> bool { + pub(crate) fn is_broken(&self) -> bool { matches!(&*self.state.borrow(), TimelineState::Broken { .. }) } - pub fn is_active(&self) -> bool { + pub(crate) fn is_active(&self) -> bool { self.current_state() == TimelineState::Active } - pub fn is_stopping(&self) -> bool { + pub(crate) fn is_stopping(&self) -> bool { self.current_state() == TimelineState::Stopping } - pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver { self.state.subscribe() } - pub async fn wait_to_become_active( + pub(crate) async fn wait_to_become_active( &self, _ctx: &RequestContext, // Prepare for use by cancellation ) -> Result<(), TimelineState> { @@ -1251,7 +1254,7 @@ impl Timeline { } } - pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let guard = self.layers.read().await; let layer_map = guard.layer_map(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); @@ -1275,7 +1278,10 @@ impl Timeline { } #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))] - pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { + pub(crate) async fn download_layer( + &self, + layer_file_name: &str, + ) -> anyhow::Result> { let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None); }; @@ -1292,7 +1298,7 @@ impl Timeline { /// Evict just one layer. /// /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`. - pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { + pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { let _gate = self .gate .enter() @@ -1315,7 +1321,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { - pub fn get_lazy_slru_download(&self) -> bool { + pub(crate) fn get_lazy_slru_download(&self) -> bool { let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; tenant_conf .lazy_slru_download @@ -1852,6 +1858,12 @@ impl Timeline { priority: GetLogicalSizePriority, ctx: &RequestContext, ) -> logical_size::CurrentLogicalSize { + if !self.tenant_shard_id.is_zero() { + // Logical size is only accurately maintained on shard zero: when called elsewhere, for example + // when HTTP API is serving a GET for timeline zero, return zero + return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero()); + } + let current_size = self.current_logical_size.current_size(); debug!("Current size: {current_size:?}"); @@ -2094,7 +2106,7 @@ impl Timeline { .expect("only this task sets it"); } - pub fn spawn_ondemand_logical_size_calculation( + pub(crate) fn spawn_ondemand_logical_size_calculation( self: &Arc, lsn: Lsn, cause: LogicalSizeCalculationCause, @@ -2140,6 +2152,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { span::debug_assert_current_span_has_tenant_and_timeline_id(); + // We should never be calculating logical sizes on shard !=0, because these shards do not have + // accurate relation sizes, and they do not emit consumption metrics. + debug_assert!(self.tenant_shard_id.is_zero()); let _guard = self.gate.enter(); @@ -2173,7 +2188,7 @@ impl Timeline { /// # Cancel-Safety /// /// This method is cancellation-safe. - pub async fn calculate_logical_size( + async fn calculate_logical_size( &self, up_to_lsn: Lsn, cause: LogicalSizeCalculationCause, @@ -3422,7 +3437,7 @@ enum DurationRecorder { } impl DurationRecorder { - pub fn till_now(&self) -> DurationRecorder { + fn till_now(&self) -> DurationRecorder { match self { DurationRecorder::NotStarted => { panic!("must only call on recorded measurements") @@ -3433,7 +3448,7 @@ impl DurationRecorder { } } } - pub fn into_recorded(self) -> Option { + fn into_recorded(self) -> Option { match self { DurationRecorder::NotStarted => None, DurationRecorder::Recorded(recorded, _) => Some(recorded), @@ -4636,7 +4651,9 @@ impl Timeline { } } - pub fn get_download_all_remote_layers_task_info(&self) -> Option { + pub(crate) fn get_download_all_remote_layers_task_info( + &self, + ) -> Option { self.download_all_remote_layers_task_info .read() .unwrap() @@ -4732,7 +4749,7 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon // TODO Currently, Deref is used to allow easy access to read methods from this trait. // This is probably considered a bad practice in Rust and should be fixed eventually, // but will cause large code changes. -pub struct TimelineWriter<'a> { +pub(crate) struct TimelineWriter<'a> { tl: &'a Timeline, _write_guard: tokio::sync::MutexGuard<'a, ()>, } @@ -4750,7 +4767,7 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub async fn put( + pub(crate) async fn put( &self, key: Key, lsn: Lsn, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 01a5bfc32b..9bdd52e809 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -319,6 +319,13 @@ impl Timeline { cancel: &CancellationToken, ctx: &RequestContext, ) -> ControlFlow<()> { + if !self.tenant_shard_id.is_zero() { + // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size + // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore + // skip imitating logical size accesses for eviction purposes. + return ControlFlow::Continue(()); + } + let mut state = self.eviction_task_timeline_state.lock().await; // Only do the imitate_layer accesses approximately as often as the threshold. A little diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index 03bc59ea38..8f9ca0e29f 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -101,6 +101,14 @@ impl From<&Exact> for u64 { } } +impl Approximate { + /// For use in situations where we don't have a sane logical size value but need + /// to return something, e.g. in HTTP API on shard >0 of a sharded tenant. + pub(crate) fn zero() -> Self { + Self(0) + } +} + impl CurrentLogicalSize { pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 { match self { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index e398d683e5..73eb42bb30 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -426,13 +426,21 @@ pub(super) async fn handle_walreceiver_connection( // Send the replication feedback message. // Regular standby_status_update fields are put into this message. - let current_timeline_size = timeline - .get_current_logical_size( - crate::tenant::timeline::GetLogicalSizePriority::User, - &ctx, - ) - // FIXME: https://github.com/neondatabase/neon/issues/5963 - .size_dont_care_about_accuracy(); + let current_timeline_size = if timeline.tenant_shard_id.is_zero() { + timeline + .get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::User, + &ctx, + ) + // FIXME: https://github.com/neondatabase/neon/issues/5963 + .size_dont_care_about_accuracy() + } else { + // Non-zero shards send zero for logical size. The safekeeper will ignore + // this number. This is because in a sharded tenant, only shard zero maintains + // accurate logical size. + 0 + }; + let status_update = PageserverFeedback { current_timeline_size, last_received_lsn,