mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 11:10:36 +00:00
Compare commits
4 Commits
release-pr
...
jcsp/pages
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2183290939 | ||
|
|
f8d8c656fc | ||
|
|
661d5b7c68 | ||
|
|
fe21c7fe19 |
@@ -400,6 +400,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
enforce_circuit_breakers: settings
|
||||
.remove("enforce_circuit_breakers")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'enforce_circuit_breakers' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -505,6 +510,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
enforce_circuit_breakers: settings
|
||||
.remove("enforce_circuit_breakers")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'enforce_circuit_breakers' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -283,6 +283,7 @@ pub struct TenantConfig {
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub enforce_circuit_breakers: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
||||
90
libs/utils/src/circuit_breaker.rs
Normal file
90
libs/utils/src/circuit_breaker.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
|
||||
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
|
||||
/// to mitigate the log spam from repeated failures.
|
||||
pub struct CircuitBreaker {
|
||||
/// Consecutive failures since last success
|
||||
fail_count: usize,
|
||||
|
||||
/// How many consecutive failures before we break the circuit
|
||||
fail_threshold: usize,
|
||||
|
||||
/// If circuit is broken, when was it broken?
|
||||
broken_at: Option<Instant>,
|
||||
|
||||
/// If set, we will auto-reset the circuit this long after it was broken. If None, broken
|
||||
/// circuits stay broken forever, or until success() is called.
|
||||
reset_period: Option<Duration>,
|
||||
|
||||
/// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
|
||||
/// to permit something to keep running even if it would otherwise have tripped it.
|
||||
short_circuit: bool,
|
||||
}
|
||||
|
||||
impl CircuitBreaker {
|
||||
pub fn new(fail_threshold: usize, reset_period: Option<Duration>) -> Self {
|
||||
Self {
|
||||
fail_count: 0,
|
||||
fail_threshold,
|
||||
broken_at: None,
|
||||
reset_period,
|
||||
short_circuit: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn short_circuit() -> Self {
|
||||
Self {
|
||||
fail_threshold: 0,
|
||||
fail_count: 0,
|
||||
broken_at: None,
|
||||
reset_period: None,
|
||||
short_circuit: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail(&mut self) {
|
||||
if self.short_circuit {
|
||||
return;
|
||||
}
|
||||
|
||||
self.fail_count += 1;
|
||||
if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
|
||||
self.break_circuit();
|
||||
}
|
||||
}
|
||||
|
||||
/// Call this after successfully executing an operation
|
||||
pub fn success(&mut self) {
|
||||
self.fail_count = 0;
|
||||
self.broken_at = None;
|
||||
}
|
||||
|
||||
/// Call this before attempting an operation, and skip the operation if we are currently broken.
|
||||
pub fn is_broken(&mut self) -> bool {
|
||||
if self.short_circuit {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(broken_at) = self.broken_at {
|
||||
match self.reset_period {
|
||||
Some(reset_period) if broken_at.elapsed() > reset_period => {
|
||||
self.reset_circuit();
|
||||
false
|
||||
}
|
||||
_ => true,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn break_circuit(&mut self) {
|
||||
self.broken_at = Some(Instant::now())
|
||||
}
|
||||
|
||||
fn reset_circuit(&mut self) {
|
||||
self.broken_at = None;
|
||||
self.fail_count = 0;
|
||||
}
|
||||
}
|
||||
@@ -87,6 +87,8 @@ pub mod failpoint_support;
|
||||
|
||||
pub mod yielding_loop;
|
||||
|
||||
pub mod circuit_breaker;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
@@ -34,6 +34,7 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff;
|
||||
use utils::circuit_breaker::CircuitBreaker;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::failpoint_support;
|
||||
@@ -293,6 +294,10 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
/// Track repeated failures to compact, so that we can back off.
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
/// background warmup.
|
||||
@@ -1936,11 +1941,68 @@ impl Tenant {
|
||||
timelines_to_compact
|
||||
};
|
||||
|
||||
// Before doing any I/O work, check our circuit breaker
|
||||
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut total_physical = 0;
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
let timeline_result = timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
if let Some(remote_client) = &timeline.remote_client {
|
||||
total_physical += remote_client.get_remote_physical_size();
|
||||
}
|
||||
|
||||
if timeline_result.is_err() {
|
||||
self.compaction_circuit_breaker.lock().unwrap().fail();
|
||||
}
|
||||
|
||||
timeline_result?;
|
||||
}
|
||||
|
||||
self.compaction_circuit_breaker.lock().unwrap().success();
|
||||
|
||||
// Circuit breaker: if a timeline's statistics indicate a pathological storage issue, such
|
||||
// as extremely high write inflation, then we will stop ingesting data for that timeline. This
|
||||
// reduces the blast radius of postgres/walingest bugs that might enable one tenant to generate
|
||||
// an extremely large storage size, and thereby interfere with other tenants on the same pageserver.
|
||||
let synthetic_size = self.cached_synthetic_tenant_size.load(Ordering::Relaxed);
|
||||
if synthetic_size > 0 {
|
||||
let amplification = total_physical as f64 / synthetic_size as f64;
|
||||
|
||||
// We only try to evaluate amplification once synthetic size reaches some threshold, to avoid
|
||||
// noisy results on very small/new tenants.
|
||||
const SIZE_THRESHOLD_FOR_AMPLIFICATION_CHECK: u64 = 1000000000;
|
||||
|
||||
// Typical storage amplification is something like 3x-10x. 100x would be really extreme.
|
||||
// 1000x is unthinkable: if we see an amplification this extreme, then something bad and
|
||||
// dangerous is going on.
|
||||
const PATHOLOGICAL_AMPLIFICATION_FACTOR: f64 = 1000.0;
|
||||
|
||||
if synthetic_size > SIZE_THRESHOLD_FOR_AMPLIFICATION_CHECK
|
||||
&& amplification > PATHOLOGICAL_AMPLIFICATION_FACTOR
|
||||
{
|
||||
tracing::error!("Pathological storage amplification detected (synthetic size {synthetic_size}, physical size {total_physical}): shutting down ingest");
|
||||
if self.get_enforce_circuit_breakers() {
|
||||
for (timeline_id, timeline) in timelines_to_compact {
|
||||
if tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
timeline.kill_wal_receiver(),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
tracing::error!(
|
||||
"Timed out shutting down WAL intest on timeline {timeline_id}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2593,6 +2655,16 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_enforce_circuit_breakers(&self) -> bool {
|
||||
let tenant_conf = self
|
||||
.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.tenant_conf
|
||||
.enforce_circuit_breakers;
|
||||
tenant_conf.unwrap_or(self.conf.default_tenant_conf.enforce_circuit_breakers)
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
@@ -2752,6 +2824,10 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
compaction_circuit_breaker: std::sync::Mutex::new(CircuitBreaker::new(
|
||||
5,
|
||||
Some(Duration::from_secs(3600)),
|
||||
)),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
@@ -3962,6 +4038,7 @@ pub(crate) mod harness {
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
|
||||
enforce_circuit_breakers: Some(tenant_conf.enforce_circuit_breakers),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,8 @@ pub mod defaults {
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
pub const DEFAULT_ENFORCE_CIRCUIT_BREAKERS: bool = false;
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -348,6 +350,10 @@ pub struct TenantConf {
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
/// If true, then the tenant will automatically shut off external APIs (e.g. wal ingest, page service) in
|
||||
/// response to high failure rates that likely indicate a bug.
|
||||
pub enforce_circuit_breakers: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -437,6 +443,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub enforce_circuit_breakers: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -485,6 +495,9 @@ impl TenantConfOpt {
|
||||
lazy_slru_download: self
|
||||
.lazy_slru_download
|
||||
.unwrap_or(global_conf.lazy_slru_download),
|
||||
enforce_circuit_breakers: self
|
||||
.enforce_circuit_breakers
|
||||
.unwrap_or(global_conf.enforce_circuit_breakers),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -524,6 +537,7 @@ impl Default for TenantConf {
|
||||
gc_feedback: false,
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
enforce_circuit_breakers: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -596,6 +610,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
gc_feedback: value.gc_feedback,
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
enforce_circuit_breakers: value.enforce_circuit_breakers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1655,6 +1655,17 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
/// For terminating wal ingestion without tearing down the rest of the Timeline (i.e. reads to
|
||||
/// already ingested data should still work)
|
||||
pub(super) async fn kill_wal_receiver(&self) {
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Initialize with an empty layer map. Used when creating a new timeline.
|
||||
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
|
||||
let mut layers = self.layers.try_write().expect(
|
||||
|
||||
@@ -174,6 +174,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"pitr_interval": "1m",
|
||||
"lagging_wal_timeout": "23m",
|
||||
"lazy_slru_download": True,
|
||||
"enforce_circuit_breakers": True,
|
||||
"max_lsn_wal_lag": 230000,
|
||||
"min_resident_size_override": 23,
|
||||
"trace_read_requests": True,
|
||||
|
||||
Reference in New Issue
Block a user