mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
storcon: use Duration for duration's in the storage controller tenant config (#10928)
## Problem The storage controller treats durations in the tenant config as strings. These are loaded from the db. The pageserver maps these durations to a seconds only format and we always get a mismatch compared to what's in the db. ## Summary of changes Treat durations as durations inside the storage controller and not as strings. Nothing changes in the cross service API's themselves or the way things are stored in the db. I also added some logging which I would have made the investigation a 10min job: 1. Reason for why the reconciliation was spawned 2. Location config diff between the observed and wanted states
This commit is contained in:
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -1874,6 +1874,12 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "difflib"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3331,6 +3337,17 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json-structural-diff"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
|
||||
dependencies = [
|
||||
"difflib",
|
||||
"regex",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "9.2.0"
|
||||
@@ -6443,6 +6460,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
|
||||
@@ -210,6 +210,7 @@ rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -335,13 +335,21 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
checkpoint_timeout: settings
|
||||
.remove("checkpoint_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_timeout' as duration")?,
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_period: settings
|
||||
.remove("compaction_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_period' as duration")?,
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -387,7 +395,10 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
gc_period: settings.remove("gc_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_period' as duration")?,
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -403,13 +414,20 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
pitr_interval: settings.remove("pitr_interval")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'pitr_interval' as duration")?,
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
@@ -427,8 +445,14 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
|
||||
heatmap_period: settings
|
||||
.remove("heatmap_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'heatmap_period' as duration")?,
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
@@ -439,10 +463,15 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
|
||||
lsn_lease_length: settings.remove("lsn_lease_length")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length' as duration")?,
|
||||
lsn_lease_length_for_ts: settings
|
||||
.remove("lsn_lease_length_for_ts")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
|
||||
timeline_offloading: settings
|
||||
.remove("timeline_offloading")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
threshold: threshold.into(),
|
||||
},
|
||||
)),
|
||||
heatmap_period: Some("300s".to_string()),
|
||||
heatmap_period: Some(Duration::from_secs(300)),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
|
||||
@@ -526,9 +526,13 @@ pub struct TenantConfigPatch {
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
pub compaction_upper_limit: Option<usize>,
|
||||
// defer parsing compaction_algorithm, like eviction_policy
|
||||
@@ -539,22 +543,38 @@ pub struct TenantConfig {
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
pub gc_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Option<Duration>,
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
pub pitr_interval: Option<String>,
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub heatmap_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub image_creation_preempt_threshold: Option<usize>,
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Option<Duration>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
@@ -564,7 +584,10 @@ pub struct TenantConfig {
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
|
||||
pub fn apply_patch(
|
||||
self,
|
||||
patch: TenantConfigPatch,
|
||||
) -> Result<TenantConfig, humantime::DurationError> {
|
||||
let Self {
|
||||
mut checkpoint_distance,
|
||||
mut checkpoint_timeout,
|
||||
@@ -604,11 +627,17 @@ impl TenantConfig {
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.checkpoint_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.compaction_target_size
|
||||
.apply(&mut compaction_target_size);
|
||||
patch.compaction_period.apply(&mut compaction_period);
|
||||
patch
|
||||
.compaction_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut compaction_period);
|
||||
patch.compaction_threshold.apply(&mut compaction_threshold);
|
||||
patch
|
||||
.compaction_upper_limit
|
||||
@@ -626,15 +655,25 @@ impl TenantConfig {
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch.gc_period.apply(&mut gc_period);
|
||||
patch
|
||||
.gc_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut gc_period);
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
patch.pitr_interval.apply(&mut pitr_interval);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut pitr_interval);
|
||||
patch
|
||||
.walreceiver_connect_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut walreceiver_connect_timeout);
|
||||
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
|
||||
patch
|
||||
.lagging_wal_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lagging_wal_timeout);
|
||||
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
|
||||
patch.eviction_policy.apply(&mut eviction_policy);
|
||||
patch
|
||||
@@ -642,8 +681,12 @@ impl TenantConfig {
|
||||
.apply(&mut min_resident_size_override);
|
||||
patch
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut evictions_low_residence_duration_metric_threshold);
|
||||
patch.heatmap_period.apply(&mut heatmap_period);
|
||||
patch
|
||||
.heatmap_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut heatmap_period);
|
||||
patch.lazy_slru_download.apply(&mut lazy_slru_download);
|
||||
patch
|
||||
.timeline_get_throttle
|
||||
@@ -654,9 +697,13 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_preempt_threshold
|
||||
.apply(&mut image_creation_preempt_threshold);
|
||||
patch.lsn_lease_length.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length_for_ts
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length_for_ts);
|
||||
patch.timeline_offloading.apply(&mut timeline_offloading);
|
||||
patch
|
||||
@@ -673,7 +720,7 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
checkpoint_timeout,
|
||||
compaction_target_size,
|
||||
@@ -709,7 +756,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2503,7 +2550,7 @@ mod tests {
|
||||
..base.clone()
|
||||
};
|
||||
|
||||
let patched = base.apply_patch(decoded.config);
|
||||
let patched = base.apply_patch(decoded.config).unwrap();
|
||||
|
||||
assert_eq!(patched, expected);
|
||||
}
|
||||
|
||||
@@ -693,16 +693,15 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
/// This is a conversion from our internal tenant config object to the one used
|
||||
/// in external APIs.
|
||||
impl From<TenantConfOpt> for models::TenantConfig {
|
||||
// TODO(vlad): These are now the same, but they have different serialization logic.
|
||||
// Can we merge them?
|
||||
fn from(value: TenantConfOpt) -> Self {
|
||||
fn humantime(d: Duration) -> String {
|
||||
format!("{}s", d.as_secs())
|
||||
}
|
||||
Self {
|
||||
checkpoint_distance: value.checkpoint_distance,
|
||||
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
|
||||
checkpoint_timeout: value.checkpoint_timeout,
|
||||
compaction_algorithm: value.compaction_algorithm,
|
||||
compaction_target_size: value.compaction_target_size,
|
||||
compaction_period: value.compaction_period.map(humantime),
|
||||
compaction_period: value.compaction_period,
|
||||
compaction_threshold: value.compaction_threshold,
|
||||
compaction_upper_limit: value.compaction_upper_limit,
|
||||
compaction_l0_first: value.compaction_l0_first,
|
||||
@@ -711,24 +710,23 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload: value.l0_flush_wait_upload,
|
||||
gc_horizon: value.gc_horizon,
|
||||
gc_period: value.gc_period.map(humantime),
|
||||
gc_period: value.gc_period,
|
||||
image_creation_threshold: value.image_creation_threshold,
|
||||
pitr_interval: value.pitr_interval.map(humantime),
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
pitr_interval: value.pitr_interval,
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
|
||||
lagging_wal_timeout: value.lagging_wal_timeout,
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
heatmap_period: value.heatmap_period,
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
lsn_lease_length: value.lsn_lease_length,
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: value.rel_size_v2_enabled,
|
||||
@@ -760,29 +758,10 @@ mod tests {
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_err() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5a".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
|
||||
|
||||
assert!(
|
||||
tenant_conf_opt.is_err(),
|
||||
"Suceeded to convert TenantConfig to TenantConfOpt"
|
||||
);
|
||||
|
||||
let expected_error_str =
|
||||
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
|
||||
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_success() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5s".to_string()),
|
||||
lagging_wal_timeout: Some(Duration::from_secs(5)),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use json_structural_diff::JsonDiff;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
@@ -24,7 +25,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
@@ -880,7 +881,27 @@ impl Reconciler {
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
|
||||
let diff = match observed {
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let diff = JsonDiff::diff(
|
||||
&serde_json::to_value(observed.clone()).unwrap(),
|
||||
&serde_json::to_value(wanted_conf.clone()).unwrap(),
|
||||
false,
|
||||
);
|
||||
|
||||
if let Some(json_diff) = diff.diff {
|
||||
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
_ => "full".to_string(),
|
||||
};
|
||||
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
|
||||
|
||||
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
|
||||
// function: this could be avoided by refactoring the state mutated by location_config into
|
||||
@@ -1180,7 +1201,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
|
||||
@@ -2926,7 +2926,9 @@ impl Service {
|
||||
first
|
||||
};
|
||||
|
||||
let updated_config = base.apply_patch(patch);
|
||||
let updated_config = base
|
||||
.apply_patch(patch)
|
||||
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
|
||||
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
|
||||
.await
|
||||
}
|
||||
@@ -6654,11 +6656,12 @@ impl Service {
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
let reconcile_needed = shard.get_reconcile_needed(nodes);
|
||||
|
||||
match reconcile_needed {
|
||||
let reconcile_reason = match reconcile_needed {
|
||||
ReconcileNeeded::No => return None,
|
||||
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
|
||||
ReconcileNeeded::Yes => {
|
||||
ReconcileNeeded::Yes(reason) => {
|
||||
// Fall through to try and acquire units for spawning reconciler
|
||||
reason
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6697,6 +6700,7 @@ impl Service {
|
||||
};
|
||||
|
||||
shard.spawn_reconciler(
|
||||
reconcile_reason,
|
||||
&self.result_tx,
|
||||
nodes,
|
||||
&self.compute_hook,
|
||||
|
||||
@@ -481,7 +481,14 @@ pub(crate) enum ReconcileNeeded {
|
||||
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
||||
WaitExisting(ReconcilerWaiter),
|
||||
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
||||
Yes,
|
||||
Yes(ReconcileReason),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReconcileReason {
|
||||
ActiveNodesDirty,
|
||||
UnknownLocation,
|
||||
PendingComputeNotification,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
@@ -1341,12 +1348,18 @@ impl TenantShard {
|
||||
|
||||
let active_nodes_dirty = self.dirty(pageservers);
|
||||
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
let do_reconcile =
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
let reconcile_needed = match (
|
||||
active_nodes_dirty,
|
||||
dirty_observed,
|
||||
self.pending_compute_notification,
|
||||
) {
|
||||
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
|
||||
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
|
||||
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
|
||||
_ => ReconcileNeeded::No,
|
||||
};
|
||||
|
||||
if !do_reconcile {
|
||||
if matches!(reconcile_needed, ReconcileNeeded::No) {
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
@@ -1389,7 +1402,7 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
ReconcileNeeded::Yes
|
||||
reconcile_needed
|
||||
}
|
||||
|
||||
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
||||
@@ -1479,6 +1492,7 @@ impl TenantShard {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn spawn_reconciler(
|
||||
&mut self,
|
||||
reason: ReconcileReason,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
@@ -1538,7 +1552,7 @@ impl TenantShard {
|
||||
let reconcile_seq = self.sequence;
|
||||
let long_reconcile_threshold = service_config.long_reconcile_threshold;
|
||||
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -3817,3 +3817,43 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["listen_https_port"] is None
|
||||
|
||||
|
||||
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate that a storage controller restart with no shards in a transient state
|
||||
performs zero reconciliations at start-up. Implicitly, this means that the location
|
||||
configs returned by the pageserver are identical to the persisted state in the
|
||||
storage controller database.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 1
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"start_as_candidate": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_before_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_before_restart != 0
|
||||
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_after_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
Reference in New Issue
Block a user