mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Merge branch 'problame/2024-02-walredo-work/prespawn/split-code' into problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo
This commit is contained in:
@@ -124,7 +124,7 @@ pub(super) enum FlushLoopState {
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Hole {
|
||||
pub(crate) struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
@@ -565,19 +565,19 @@ impl From<GetReadyAncestorError> for PageReconstructError {
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
pub fn get_ancestor_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
}
|
||||
|
||||
/// Get the ancestor's timeline id
|
||||
pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
|
||||
pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.ancestor_timeline
|
||||
.as_ref()
|
||||
.map(|ancestor| ancestor.timeline_id)
|
||||
}
|
||||
|
||||
/// Lock and get timeline's GC cutoff
|
||||
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
@@ -733,27 +733,27 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
pub fn get_last_record_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
}
|
||||
|
||||
pub fn get_prev_record_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().prev
|
||||
}
|
||||
|
||||
/// Atomically get both last and prev.
|
||||
pub fn get_last_record_rlsn(&self) -> RecordLsn {
|
||||
pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
|
||||
self.last_record_lsn.load()
|
||||
}
|
||||
|
||||
pub fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
self.disk_consistent_lsn.load()
|
||||
}
|
||||
|
||||
/// remote_consistent_lsn from the perspective of the tenant's current generation,
|
||||
/// not validated with control plane yet.
|
||||
/// See [`Self::get_remote_consistent_lsn_visible`].
|
||||
pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.remote_consistent_lsn_projected()
|
||||
} else {
|
||||
@@ -764,7 +764,7 @@ impl Timeline {
|
||||
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
|
||||
/// i.e. a value of remote_consistent_lsn_projected which has undergone
|
||||
/// generation validation in the deletion queue.
|
||||
pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.remote_consistent_lsn_visible()
|
||||
} else {
|
||||
@@ -775,7 +775,7 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
pub(crate) async fn layer_size_sum(&self) -> u64 {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut size = 0;
|
||||
@@ -785,7 +785,7 @@ impl Timeline {
|
||||
size
|
||||
}
|
||||
|
||||
pub fn resident_physical_size(&self) -> u64 {
|
||||
pub(crate) fn resident_physical_size(&self) -> u64 {
|
||||
self.metrics.resident_physical_size_get()
|
||||
}
|
||||
|
||||
@@ -861,7 +861,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Check that it is valid to request operations with that lsn.
|
||||
pub fn check_lsn_is_in_scope(
|
||||
pub(crate) fn check_lsn_is_in_scope(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
@@ -877,7 +877,7 @@ impl Timeline {
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
@@ -1021,7 +1021,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub async fn writer(&self) -> TimelineWriter<'_> {
|
||||
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
@@ -1033,7 +1033,7 @@ impl Timeline {
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let open_layer_size = {
|
||||
let guard = self.layers.read().await;
|
||||
@@ -1071,13 +1071,16 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn activate(
|
||||
pub(crate) fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
if self.tenant_shard_id.is_zero() {
|
||||
// Logical size is only maintained accurately on shard zero.
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
}
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
@@ -1172,7 +1175,7 @@ impl Timeline {
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
pub(crate) fn set_state(&self, new_state: TimelineState) {
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
@@ -1192,7 +1195,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_broken(&self, reason: String) {
|
||||
pub(crate) fn set_broken(&self, reason: String) {
|
||||
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
|
||||
let broken_state = TimelineState::Broken {
|
||||
reason,
|
||||
@@ -1206,27 +1209,27 @@ impl Timeline {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TimelineState {
|
||||
pub(crate) fn current_state(&self) -> TimelineState {
|
||||
self.state.borrow().clone()
|
||||
}
|
||||
|
||||
pub fn is_broken(&self) -> bool {
|
||||
pub(crate) fn is_broken(&self) -> bool {
|
||||
matches!(&*self.state.borrow(), TimelineState::Broken { .. })
|
||||
}
|
||||
|
||||
pub fn is_active(&self) -> bool {
|
||||
pub(crate) fn is_active(&self) -> bool {
|
||||
self.current_state() == TimelineState::Active
|
||||
}
|
||||
|
||||
pub fn is_stopping(&self) -> bool {
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
self.current_state() == TimelineState::Stopping
|
||||
}
|
||||
|
||||
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
|
||||
pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub async fn wait_to_become_active(
|
||||
pub(crate) async fn wait_to_become_active(
|
||||
&self,
|
||||
_ctx: &RequestContext, // Prepare for use by cancellation
|
||||
) -> Result<(), TimelineState> {
|
||||
@@ -1251,7 +1254,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
@@ -1275,7 +1278,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
pub(crate) async fn download_layer(
|
||||
&self,
|
||||
layer_file_name: &str,
|
||||
) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -1292,7 +1298,7 @@ impl Timeline {
|
||||
/// Evict just one layer.
|
||||
///
|
||||
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let _gate = self
|
||||
.gate
|
||||
.enter()
|
||||
@@ -1315,7 +1321,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub fn get_lazy_slru_download(&self) -> bool {
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
tenant_conf
|
||||
.lazy_slru_download
|
||||
@@ -1852,6 +1858,12 @@ impl Timeline {
|
||||
priority: GetLogicalSizePriority,
|
||||
ctx: &RequestContext,
|
||||
) -> logical_size::CurrentLogicalSize {
|
||||
if !self.tenant_shard_id.is_zero() {
|
||||
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example
|
||||
// when HTTP API is serving a GET for timeline zero, return zero
|
||||
return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
|
||||
}
|
||||
|
||||
let current_size = self.current_logical_size.current_size();
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
@@ -2094,7 +2106,7 @@ impl Timeline {
|
||||
.expect("only this task sets it");
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
pub(crate) fn spawn_ondemand_logical_size_calculation(
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
@@ -2140,6 +2152,9 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
// We should never be calculating logical sizes on shard !=0, because these shards do not have
|
||||
// accurate relation sizes, and they do not emit consumption metrics.
|
||||
debug_assert!(self.tenant_shard_id.is_zero());
|
||||
|
||||
let _guard = self.gate.enter();
|
||||
|
||||
@@ -2173,7 +2188,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn calculate_logical_size(
|
||||
async fn calculate_logical_size(
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
@@ -3422,7 +3437,7 @@ enum DurationRecorder {
|
||||
}
|
||||
|
||||
impl DurationRecorder {
|
||||
pub fn till_now(&self) -> DurationRecorder {
|
||||
fn till_now(&self) -> DurationRecorder {
|
||||
match self {
|
||||
DurationRecorder::NotStarted => {
|
||||
panic!("must only call on recorded measurements")
|
||||
@@ -3433,7 +3448,7 @@ impl DurationRecorder {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn into_recorded(self) -> Option<RecordedDuration> {
|
||||
fn into_recorded(self) -> Option<RecordedDuration> {
|
||||
match self {
|
||||
DurationRecorder::NotStarted => None,
|
||||
DurationRecorder::Recorded(recorded, _) => Some(recorded),
|
||||
@@ -4636,7 +4651,9 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
|
||||
pub(crate) fn get_download_all_remote_layers_task_info(
|
||||
&self,
|
||||
) -> Option<DownloadRemoteLayersTaskInfo> {
|
||||
self.download_all_remote_layers_task_info
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -4732,7 +4749,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
|
||||
// This is probably considered a bad practice in Rust and should be fixed eventually,
|
||||
// but will cause large code changes.
|
||||
pub struct TimelineWriter<'a> {
|
||||
pub(crate) struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
@@ -4750,7 +4767,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub async fn put(
|
||||
pub(crate) async fn put(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -319,6 +319,13 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
if !self.tenant_shard_id.is_zero() {
|
||||
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
|
||||
// for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
|
||||
// skip imitating logical size accesses for eviction purposes.
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
let mut state = self.eviction_task_timeline_state.lock().await;
|
||||
|
||||
// Only do the imitate_layer accesses approximately as often as the threshold. A little
|
||||
|
||||
@@ -101,6 +101,14 @@ impl From<&Exact> for u64 {
|
||||
}
|
||||
}
|
||||
|
||||
impl Approximate {
|
||||
/// For use in situations where we don't have a sane logical size value but need
|
||||
/// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
|
||||
pub(crate) fn zero() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl CurrentLogicalSize {
|
||||
pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
|
||||
match self {
|
||||
|
||||
@@ -426,13 +426,21 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
// Send the replication feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let current_timeline_size = timeline
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy();
|
||||
let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
|
||||
timeline
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy()
|
||||
} else {
|
||||
// Non-zero shards send zero for logical size. The safekeeper will ignore
|
||||
// this number. This is because in a sharded tenant, only shard zero maintains
|
||||
// accurate logical size.
|
||||
0
|
||||
};
|
||||
|
||||
let status_update = PageserverFeedback {
|
||||
current_timeline_size,
|
||||
last_received_lsn,
|
||||
|
||||
Reference in New Issue
Block a user