diff --git a/libs/utils/src/circuit_breaker.rs b/libs/utils/src/circuit_breaker.rs new file mode 100644 index 0000000000..720ea39d4f --- /dev/null +++ b/libs/utils/src/circuit_breaker.rs @@ -0,0 +1,114 @@ +use std::{ + fmt::Display, + time::{Duration, Instant}, +}; + +use metrics::IntCounter; + +/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly, +/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and +/// to mitigate the log spam from repeated failures. +pub struct CircuitBreaker { + /// An identifier that enables us to log useful errors when a circuit is broken + name: String, + + /// Consecutive failures since last success + fail_count: usize, + + /// How many consecutive failures before we break the circuit + fail_threshold: usize, + + /// If circuit is broken, when was it broken? + broken_at: Option, + + /// If set, we will auto-reset the circuit this long after it was broken. If None, broken + /// circuits stay broken forever, or until success() is called. + reset_period: Option, + + /// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker + /// to permit something to keep running even if it would otherwise have tripped it. + short_circuit: bool, +} + +impl CircuitBreaker { + pub fn new(name: String, fail_threshold: usize, reset_period: Option) -> Self { + Self { + name, + fail_count: 0, + fail_threshold, + broken_at: None, + reset_period, + short_circuit: false, + } + } + + /// Construct an unbreakable circuit breaker, for use in unit tests etc. + pub fn short_circuit() -> Self { + Self { + name: String::new(), + fail_threshold: 0, + fail_count: 0, + broken_at: None, + reset_period: None, + short_circuit: true, + } + } + + pub fn fail(&mut self, metric: &IntCounter, error: E) + where + E: Display, + { + if self.short_circuit { + return; + } + + self.fail_count += 1; + if self.broken_at.is_none() && self.fail_count >= self.fail_threshold { + self.break_circuit(metric, error); + } + } + + /// Call this after successfully executing an operation + pub fn success(&mut self, metric: &IntCounter) { + self.fail_count = 0; + if let Some(broken_at) = &self.broken_at { + tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})", + humantime::format_duration(broken_at.elapsed())); + self.broken_at = None; + metric.inc(); + } + } + + /// Call this before attempting an operation, and skip the operation if we are currently broken. + pub fn is_broken(&mut self) -> bool { + if self.short_circuit { + return false; + } + + if let Some(broken_at) = self.broken_at { + match self.reset_period { + Some(reset_period) if broken_at.elapsed() > reset_period => { + self.reset_circuit(); + false + } + _ => true, + } + } else { + false + } + } + + fn break_circuit(&mut self, metric: &IntCounter, error: E) + where + E: Display, + { + self.broken_at = Some(Instant::now()); + tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}"); + metric.inc(); + } + + fn reset_circuit(&mut self) { + self.broken_at = None; + self.fail_count = 0; + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 711e617801..9ad1752fb7 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -98,6 +98,8 @@ pub mod poison; pub mod toml_edit_ext; +pub mod circuit_breaker; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index e67fa656d0..9b3bb481b9 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -569,6 +569,22 @@ static VALID_LSN_LEASE_COUNT: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static CIRCUIT_BREAKERS_BROKEN: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_circuit_breaker_broken", + "How many times a circuit breaker has broken" + ) + .expect("failed to define a metric") +}); + +pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_circuit_breaker_unbroken", + "How many times a circuit breaker has been un-broken (recovered)" + ) + .expect("failed to define a metric") +}); + pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index bf23513527..6333fd3b63 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -39,6 +39,7 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; use utils::backoff; +use utils::circuit_breaker::CircuitBreaker; use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::failpoint_support; @@ -76,7 +77,8 @@ use crate::is_uninit_mark; use crate::l0_flush::L0FlushGlobalState; use crate::metrics::TENANT; use crate::metrics::{ - remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, + remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, + TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, }; use crate::repository::GcResult; use crate::task_mgr; @@ -276,6 +278,10 @@ pub struct Tenant { eviction_task_tenant_state: tokio::sync::Mutex, + /// Track repeated failures to compact, so that we can back off. + /// Overhead of mutex is acceptable because compaction is done with a multi-second period. + compaction_circuit_breaker: std::sync::Mutex, + /// If the tenant is in Activating state, notify this to encourage it /// to proceed to Active as soon as possible, rather than waiting for lazy /// background warmup. @@ -1641,13 +1647,31 @@ impl Tenant { timelines_to_compact }; + // Before doing any I/O work, check our circuit breaker + if self.compaction_circuit_breaker.lock().unwrap().is_broken() { + info!("Skipping compaction due to previous failures"); + return Ok(()); + } + for (timeline_id, timeline) in &timelines_to_compact { timeline .compact(cancel, EnumSet::empty(), ctx) .instrument(info_span!("compact_timeline", %timeline_id)) - .await?; + .await + .map_err(|e| { + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, &e); + e + })?; } + self.compaction_circuit_breaker + .lock() + .unwrap() + .success(&CIRCUIT_BREAKERS_UNBROKEN); + Ok(()) } @@ -2563,6 +2587,14 @@ impl Tenant { cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), + compaction_circuit_breaker: std::sync::Mutex::new(CircuitBreaker::new( + format!("compaction-{tenant_shard_id}"), + 5, + // Compaction can be a very expensive operation, and might leak disk space. It also ought + // to be infallible, as long as remote storage is available. So if it repeatedly fails, + // use an extremely long backoff. + Some(Duration::from_secs(3600 * 24)), + )), activate_now_sem: tokio::sync::Semaphore::new(0), cancel: CancellationToken::default(), gate: Gate::default(), diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 49dcb9b86a..f321c09b27 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -1,12 +1,14 @@ import enum import json import os +import time from typing import Optional import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions from fixtures.pageserver.http import PageserverApiException +from fixtures.utils import wait_until from fixtures.workload import Workload AGGRESIVE_COMPACTION_TENANT_CONF = { @@ -257,3 +259,64 @@ def test_uploads_and_deletions( found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors) if not found_allowed_error: raise Exception("None of the allowed_errors occured in the log") + + +def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder): + """ + Check that repeated failures in compaction result in a circuit breaker breaking + """ + TENANT_CONF = { + # Very frequent runs to rack up failures quickly + "compaction_period": "100ms", + # Small checkpoint distance to create many layers + "checkpoint_distance": 1024 * 128, + # Compact small layers + "compaction_target_size": 1024 * 128, + "image_creation_threshold": 1, + } + + FAILPOINT = "delta-layer-writer-fail-before-finish" + BROKEN_LOG = ".*Circuit breaker broken!.*" + + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + + workload = Workload(env, env.initial_tenant, env.initial_timeline) + workload.init() + + # Set a failpoint that will prevent compaction succeeding + env.pageserver.http_client().configure_failpoints((FAILPOINT, "return")) + + # Write some data to trigger compaction + workload.write_rows(1024, upload=False) + workload.write_rows(1024, upload=False) + workload.write_rows(1024, upload=False) + + def assert_broken(): + env.pageserver.assert_log_contains(BROKEN_LOG) + assert ( + env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total") + or 0 + ) == 1 + assert ( + env.pageserver.http_client().get_metric_value( + "pageserver_circuit_breaker_unbroken_total" + ) + or 0 + ) == 0 + + # Wait for enough failures to break the circuit breaker + # This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s + wait_until(60, 1, assert_broken) + + # Sleep for a while, during which time we expect that compaction will _not_ be retried + time.sleep(10) + + assert ( + env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total") + or 0 + ) == 1 + assert ( + env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_unbroken_total") + or 0 + ) == 0 + assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")