mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
pageserver: replace the locked tenant config with arcsawps (#7292)
## Problem For reasons unrelated to this PR, I would like to make use of the tenant conf in the `InMemoryLayer`. Previously, this was not possible without copying and manually updating the copy to keep it in sync with updates. ## Summary of Changes: Replace the `Arc<RwLock<AttachedTenantConf>>` with `Arc<ArcSwap<AttachedTenantConf>>` (how many `Arc(s)` can one fit in a type?). The most interesting part of this change is the updating of the tenant config (`set_new_tenant_config` and `set_new_location_config`). In theory, these two may race, although the storage controller should prevent this via the tenant exclusive op lock. Particular care has been taken to not "lose" a location config update by using the read-copy-update approach when updating only the config.
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use arc_swap::ArcSwap;
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
use enumset::EnumSet;
|
||||
@@ -98,7 +99,7 @@ use std::ops::Bound::Included;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::span;
|
||||
@@ -260,7 +261,7 @@ pub struct Tenant {
|
||||
// We keep TenantConfOpt sturct here to preserve the information
|
||||
// about parameters that are not set.
|
||||
// This is necessary to allow global config updates.
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -1606,7 +1607,7 @@ impl Tenant {
|
||||
);
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
|
||||
if !conf.location.may_delete_layers_hint() {
|
||||
info!("Skipping GC in location state {:?}", conf.location);
|
||||
@@ -1633,7 +1634,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
info!("Skipping compaction in location state {:?}", conf.location);
|
||||
return Ok(());
|
||||
@@ -2082,14 +2083,14 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf.read().unwrap().location.attach_mode
|
||||
self.tenant_conf.load().location.attach_mode
|
||||
}
|
||||
|
||||
/// For API access: generate a LocationConfig equivalent to the one that would be used to
|
||||
/// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
|
||||
/// rare external API calls, like a reconciliation at startup.
|
||||
pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
|
||||
let location_config_mode = match conf.location.attach_mode {
|
||||
AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
|
||||
@@ -2236,7 +2237,7 @@ where
|
||||
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
self.tenant_conf.load().tenant_conf.clone()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
@@ -2245,84 +2246,84 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
pub fn get_compaction_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
|
||||
}
|
||||
|
||||
pub fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
pub fn get_gc_horizon(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_horizon
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
|
||||
}
|
||||
|
||||
pub fn get_gc_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_period)
|
||||
}
|
||||
|
||||
pub fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
pub fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.pitr_interval
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.min_resident_size_override
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn get_heatmap_period(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
let heatmap_period = tenant_conf
|
||||
.heatmap_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.heatmap_period);
|
||||
@@ -2334,26 +2335,40 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
|
||||
self.tenant_conf_updated();
|
||||
// Use read-copy-update in order to avoid overwriting the location config
|
||||
// state if this races with [`Tenant::set_new_location_config`]. Note that
|
||||
// this race is not possible if both request types come from the storage
|
||||
// controller (as they should!) because an exclusive op lock is required
|
||||
// on the storage controller side.
|
||||
self.tenant_conf.rcu(|inner| {
|
||||
Arc::new(AttachedTenantConf {
|
||||
tenant_conf: new_tenant_conf.clone(),
|
||||
location: inner.location,
|
||||
})
|
||||
});
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
|
||||
*self.tenant_conf.write().unwrap() = new_conf;
|
||||
self.tenant_conf_updated();
|
||||
let new_tenant_conf = new_conf.tenant_conf.clone();
|
||||
|
||||
self.tenant_conf.store(Arc::new(new_conf));
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2367,11 +2382,8 @@ impl Tenant {
|
||||
.unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_conf_updated(&self) {
|
||||
let conf = {
|
||||
let guard = self.tenant_conf.read().unwrap();
|
||||
Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf)
|
||||
};
|
||||
pub(crate) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
let conf = Self::get_timeline_get_throttle_config(self.conf, new_conf);
|
||||
self.timeline_get_throttle.reconfigure(conf)
|
||||
}
|
||||
|
||||
@@ -2519,7 +2531,7 @@ impl Tenant {
|
||||
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3505,7 +3517,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
self.tenant_conf.load().tenant_conf.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use enumset::EnumSet;
|
||||
@@ -183,7 +184,7 @@ pub(crate) struct AuxFilesState {
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
|
||||
myself: Weak<Self>,
|
||||
|
||||
@@ -1588,57 +1589,65 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.lazy_slru_download
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
fn get_compaction_algorithm(&self) -> CompactionAlgorithm {
|
||||
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = &self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_algorithm
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_algorithm)
|
||||
}
|
||||
|
||||
fn get_eviction_policy(&self) -> EvictionPolicy {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.eviction_policy
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
}
|
||||
@@ -1653,22 +1662,25 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf.image_layer_creation_check_threshold.unwrap_or(
|
||||
self.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_creation_check_threshold,
|
||||
)
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.image_layer_creation_check_threshold
|
||||
.unwrap_or(
|
||||
self.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_creation_check_threshold,
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&self.tenant_conf.read().unwrap().tenant_conf,
|
||||
new_conf,
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
|
||||
@@ -1695,7 +1707,7 @@ impl Timeline {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1714,14 +1726,13 @@ impl Timeline {
|
||||
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
|
||||
let tenant_conf_guard = tenant_conf.read().unwrap();
|
||||
|
||||
let evictions_low_residence_duration_metric_threshold =
|
||||
let evictions_low_residence_duration_metric_threshold = {
|
||||
let loaded_tenant_conf = tenant_conf.load();
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&tenant_conf_guard.tenant_conf,
|
||||
&loaded_tenant_conf.tenant_conf,
|
||||
&conf.default_tenant_conf,
|
||||
);
|
||||
drop(tenant_conf_guard);
|
||||
)
|
||||
};
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
let mut result = Timeline {
|
||||
@@ -1904,20 +1915,19 @@ impl Timeline {
|
||||
self.timeline_id, self.tenant_shard_id
|
||||
);
|
||||
|
||||
let tenant_conf_guard = self.tenant_conf.read().unwrap();
|
||||
let wal_connect_timeout = tenant_conf_guard
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let wal_connect_timeout = tenant_conf
|
||||
.tenant_conf
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
|
||||
let lagging_wal_timeout = tenant_conf_guard
|
||||
let lagging_wal_timeout = tenant_conf
|
||||
.tenant_conf
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
let max_lsn_wal_lag = tenant_conf
|
||||
.tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
let mut guard = self.walreceiver.lock().unwrap();
|
||||
assert!(
|
||||
|
||||
Reference in New Issue
Block a user