Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code

This commit is contained in:
Christian Schwarz
2024-02-02 15:53:55 +00:00
4 changed files with 84 additions and 44 deletions

View File

@@ -124,7 +124,7 @@ pub(super) enum FlushLoopState {
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Hole {
pub(crate) struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
@@ -565,19 +565,19 @@ impl From<GetReadyAncestorError> for PageReconstructError {
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
/// Get the ancestor's timeline id
pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
self.ancestor_timeline
.as_ref()
.map(|ancestor| ancestor.timeline_id)
}
/// Lock and get timeline's GC cutoff
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
}
@@ -733,27 +733,27 @@ impl Timeline {
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub fn get_last_record_lsn(&self) -> Lsn {
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
pub fn get_prev_record_lsn(&self) -> Lsn {
pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().prev
}
/// Atomically get both last and prev.
pub fn get_last_record_rlsn(&self) -> RecordLsn {
pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
self.last_record_lsn.load()
}
pub fn get_disk_consistent_lsn(&self) -> Lsn {
pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
/// remote_consistent_lsn from the perspective of the tenant's current generation,
/// not validated with control plane yet.
/// See [`Self::get_remote_consistent_lsn_visible`].
pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_projected()
} else {
@@ -764,7 +764,7 @@ impl Timeline {
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
/// i.e. a value of remote_consistent_lsn_projected which has undergone
/// generation validation in the deletion queue.
pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_visible()
} else {
@@ -775,7 +775,7 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub async fn layer_size_sum(&self) -> u64 {
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut size = 0;
@@ -785,7 +785,7 @@ impl Timeline {
size
}
pub fn resident_physical_size(&self) -> u64 {
pub(crate) fn resident_physical_size(&self) -> u64 {
self.metrics.resident_physical_size_get()
}
@@ -861,7 +861,7 @@ impl Timeline {
}
/// Check that it is valid to request operations with that lsn.
pub fn check_lsn_is_in_scope(
pub(crate) fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
@@ -877,7 +877,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -1021,7 +1021,7 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub async fn writer(&self) -> TimelineWriter<'_> {
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().await,
@@ -1033,7 +1033,7 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let guard = self.layers.read().await;
@@ -1071,13 +1071,16 @@ impl Timeline {
Ok(())
}
pub fn activate(
pub(crate) fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.spawn_initial_logical_size_computation_task(ctx);
if self.tenant_shard_id.is_zero() {
// Logical size is only maintained accurately on shard zero.
self.spawn_initial_logical_size_computation_task(ctx);
}
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(background_jobs_can_start);
@@ -1172,7 +1175,7 @@ impl Timeline {
self.gate.close().await;
}
pub fn set_state(&self, new_state: TimelineState) {
pub(crate) fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
@@ -1192,7 +1195,7 @@ impl Timeline {
}
}
pub fn set_broken(&self, reason: String) {
pub(crate) fn set_broken(&self, reason: String) {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
let broken_state = TimelineState::Broken {
reason,
@@ -1206,27 +1209,27 @@ impl Timeline {
self.cancel.cancel();
}
pub fn current_state(&self) -> TimelineState {
pub(crate) fn current_state(&self) -> TimelineState {
self.state.borrow().clone()
}
pub fn is_broken(&self) -> bool {
pub(crate) fn is_broken(&self) -> bool {
matches!(&*self.state.borrow(), TimelineState::Broken { .. })
}
pub fn is_active(&self) -> bool {
pub(crate) fn is_active(&self) -> bool {
self.current_state() == TimelineState::Active
}
pub fn is_stopping(&self) -> bool {
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
pub async fn wait_to_become_active(
pub(crate) async fn wait_to_become_active(
&self,
_ctx: &RequestContext, // Prepare for use by cancellation
) -> Result<(), TimelineState> {
@@ -1251,7 +1254,7 @@ impl Timeline {
}
}
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
@@ -1275,7 +1278,10 @@ impl Timeline {
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
pub(crate) async fn download_layer(
&self,
layer_file_name: &str,
) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
@@ -1292,7 +1298,7 @@ impl Timeline {
/// Evict just one layer.
///
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
.enter()
@@ -1315,7 +1321,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub fn get_lazy_slru_download(&self) -> bool {
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.lazy_slru_download
@@ -1852,6 +1858,12 @@ impl Timeline {
priority: GetLogicalSizePriority,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
if !self.tenant_shard_id.is_zero() {
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example
// when HTTP API is serving a GET for timeline zero, return zero
return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
}
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
@@ -2094,7 +2106,7 @@ impl Timeline {
.expect("only this task sets it");
}
pub fn spawn_ondemand_logical_size_calculation(
pub(crate) fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
@@ -2140,6 +2152,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
span::debug_assert_current_span_has_tenant_and_timeline_id();
// We should never be calculating logical sizes on shard !=0, because these shards do not have
// accurate relation sizes, and they do not emit consumption metrics.
debug_assert!(self.tenant_shard_id.is_zero());
let _guard = self.gate.enter();
@@ -2173,7 +2188,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn calculate_logical_size(
async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
@@ -3422,7 +3437,7 @@ enum DurationRecorder {
}
impl DurationRecorder {
pub fn till_now(&self) -> DurationRecorder {
fn till_now(&self) -> DurationRecorder {
match self {
DurationRecorder::NotStarted => {
panic!("must only call on recorded measurements")
@@ -3433,7 +3448,7 @@ impl DurationRecorder {
}
}
}
pub fn into_recorded(self) -> Option<RecordedDuration> {
fn into_recorded(self) -> Option<RecordedDuration> {
match self {
DurationRecorder::NotStarted => None,
DurationRecorder::Recorded(recorded, _) => Some(recorded),
@@ -4633,7 +4648,9 @@ impl Timeline {
}
}
pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
pub(crate) fn get_download_all_remote_layers_task_info(
&self,
) -> Option<DownloadRemoteLayersTaskInfo> {
self.download_all_remote_layers_task_info
.read()
.unwrap()
@@ -4729,7 +4746,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
// This is probably considered a bad practice in Rust and should be fixed eventually,
// but will cause large code changes.
pub struct TimelineWriter<'a> {
pub(crate) struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
@@ -4747,7 +4764,7 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub async fn put(
pub(crate) async fn put(
&self,
key: Key,
lsn: Lsn,

View File

@@ -319,6 +319,13 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<()> {
if !self.tenant_shard_id.is_zero() {
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
// for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
// skip imitating logical size accesses for eviction purposes.
return ControlFlow::Continue(());
}
let mut state = self.eviction_task_timeline_state.lock().await;
// Only do the imitate_layer accesses approximately as often as the threshold. A little

View File

@@ -101,6 +101,14 @@ impl From<&Exact> for u64 {
}
}
impl Approximate {
/// For use in situations where we don't have a sane logical size value but need
/// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
pub(crate) fn zero() -> Self {
Self(0)
}
}
impl CurrentLogicalSize {
pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
match self {

View File

@@ -426,13 +426,21 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let current_timeline_size = timeline
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy();
let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
timeline
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy()
} else {
// Non-zero shards send zero for logical size. The safekeeper will ignore
// this number. This is because in a sharded tenant, only shard zero maintains
// accurate logical size.
0
};
let status_update = PageserverFeedback {
current_timeline_size,
last_received_lsn,