mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
pageserver: circuit breaker on compaction (#8359)
## Problem We already back off on compaction retries, but the impact of a failing compaction can be so great that backing off up to 300s isn't enough. The impact is consuming a lot of I/O+CPU in the case of image layer generation for large tenants, and potentially also leaking disk space. Compaction failures are extremely rare and almost always indicate a bug, frequently a bug that will not let compaction to proceed until it is fixed. Related: https://github.com/neondatabase/neon/issues/6738 ## Summary of changes - Introduce a CircuitBreaker type - Add a circuit breaker for compaction, with a policy that after 5 failures, compaction will not be attempted again for 24 hours. - Add metrics that we can alert on: any >0 value for `pageserver_circuit_breaker_broken_total` should generate an alert. - Add a test that checks this works as intended. Couple notes to reviewers: - Circuit breakers are intrinsically a defense-in-depth measure: this is not the solution to any underlying issues, it is just a general mitigation for "unknown unknowns" that might be encountered in future. - This PR isn't primarily about writing a perfect CircuitBreaker type: the one in this PR is meant to be just enough to mitigate issues in compaction, and make it easy to monitor/alert on these failures. We can refine this type in future as/when we want to use it elsewhere.
This commit is contained in:
114
libs/utils/src/circuit_breaker.rs
Normal file
114
libs/utils/src/circuit_breaker.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use std::{
|
||||
fmt::Display,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use metrics::IntCounter;
|
||||
|
||||
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
|
||||
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
|
||||
/// to mitigate the log spam from repeated failures.
|
||||
pub struct CircuitBreaker {
|
||||
/// An identifier that enables us to log useful errors when a circuit is broken
|
||||
name: String,
|
||||
|
||||
/// Consecutive failures since last success
|
||||
fail_count: usize,
|
||||
|
||||
/// How many consecutive failures before we break the circuit
|
||||
fail_threshold: usize,
|
||||
|
||||
/// If circuit is broken, when was it broken?
|
||||
broken_at: Option<Instant>,
|
||||
|
||||
/// If set, we will auto-reset the circuit this long after it was broken. If None, broken
|
||||
/// circuits stay broken forever, or until success() is called.
|
||||
reset_period: Option<Duration>,
|
||||
|
||||
/// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
|
||||
/// to permit something to keep running even if it would otherwise have tripped it.
|
||||
short_circuit: bool,
|
||||
}
|
||||
|
||||
impl CircuitBreaker {
|
||||
pub fn new(name: String, fail_threshold: usize, reset_period: Option<Duration>) -> Self {
|
||||
Self {
|
||||
name,
|
||||
fail_count: 0,
|
||||
fail_threshold,
|
||||
broken_at: None,
|
||||
reset_period,
|
||||
short_circuit: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct an unbreakable circuit breaker, for use in unit tests etc.
|
||||
pub fn short_circuit() -> Self {
|
||||
Self {
|
||||
name: String::new(),
|
||||
fail_threshold: 0,
|
||||
fail_count: 0,
|
||||
broken_at: None,
|
||||
reset_period: None,
|
||||
short_circuit: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail<E>(&mut self, metric: &IntCounter, error: E)
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
if self.short_circuit {
|
||||
return;
|
||||
}
|
||||
|
||||
self.fail_count += 1;
|
||||
if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
|
||||
self.break_circuit(metric, error);
|
||||
}
|
||||
}
|
||||
|
||||
/// Call this after successfully executing an operation
|
||||
pub fn success(&mut self, metric: &IntCounter) {
|
||||
self.fail_count = 0;
|
||||
if let Some(broken_at) = &self.broken_at {
|
||||
tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})",
|
||||
humantime::format_duration(broken_at.elapsed()));
|
||||
self.broken_at = None;
|
||||
metric.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Call this before attempting an operation, and skip the operation if we are currently broken.
|
||||
pub fn is_broken(&mut self) -> bool {
|
||||
if self.short_circuit {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(broken_at) = self.broken_at {
|
||||
match self.reset_period {
|
||||
Some(reset_period) if broken_at.elapsed() > reset_period => {
|
||||
self.reset_circuit();
|
||||
false
|
||||
}
|
||||
_ => true,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn break_circuit<E>(&mut self, metric: &IntCounter, error: E)
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
self.broken_at = Some(Instant::now());
|
||||
tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}");
|
||||
metric.inc();
|
||||
}
|
||||
|
||||
fn reset_circuit(&mut self) {
|
||||
self.broken_at = None;
|
||||
self.fail_count = 0;
|
||||
}
|
||||
}
|
||||
@@ -98,6 +98,8 @@ pub mod poison;
|
||||
|
||||
pub mod toml_edit_ext;
|
||||
|
||||
pub mod circuit_breaker;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
@@ -569,6 +569,22 @@ static VALID_LSN_LEASE_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static CIRCUIT_BREAKERS_BROKEN: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_circuit_breaker_broken",
|
||||
"How many times a circuit breaker has broken"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_circuit_breaker_unbroken",
|
||||
"How many times a circuit breaker has been un-broken (recovered)"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -39,6 +39,7 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff;
|
||||
use utils::circuit_breaker::CircuitBreaker;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::failpoint_support;
|
||||
@@ -76,7 +77,8 @@ use crate::is_uninit_mark;
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::TENANT;
|
||||
use crate::metrics::{
|
||||
remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
|
||||
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
|
||||
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
|
||||
};
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
@@ -276,6 +278,10 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
/// Track repeated failures to compact, so that we can back off.
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
/// background warmup.
|
||||
@@ -1641,13 +1647,31 @@ impl Tenant {
|
||||
timelines_to_compact
|
||||
};
|
||||
|
||||
// Before doing any I/O work, check our circuit breaker
|
||||
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
|
||||
info!("Skipping compaction due to previous failures");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, &e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.success(&CIRCUIT_BREAKERS_UNBROKEN);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2563,6 +2587,14 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
compaction_circuit_breaker: std::sync::Mutex::new(CircuitBreaker::new(
|
||||
format!("compaction-{tenant_shard_id}"),
|
||||
5,
|
||||
// Compaction can be a very expensive operation, and might leak disk space. It also ought
|
||||
// to be infallible, as long as remote storage is available. So if it repeatedly fails,
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::default(),
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
@@ -257,3 +259,64 @@ def test_uploads_and_deletions(
|
||||
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
|
||||
if not found_allowed_error:
|
||||
raise Exception("None of the allowed_errors occured in the log")
|
||||
|
||||
|
||||
def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Check that repeated failures in compaction result in a circuit breaker breaking
|
||||
"""
|
||||
TENANT_CONF = {
|
||||
# Very frequent runs to rack up failures quickly
|
||||
"compaction_period": "100ms",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024 * 128,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024 * 128,
|
||||
"image_creation_threshold": 1,
|
||||
}
|
||||
|
||||
FAILPOINT = "delta-layer-writer-fail-before-finish"
|
||||
BROKEN_LOG = ".*Circuit breaker broken!.*"
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
workload = Workload(env, env.initial_tenant, env.initial_timeline)
|
||||
workload.init()
|
||||
|
||||
# Set a failpoint that will prevent compaction succeeding
|
||||
env.pageserver.http_client().configure_failpoints((FAILPOINT, "return"))
|
||||
|
||||
# Write some data to trigger compaction
|
||||
workload.write_rows(1024, upload=False)
|
||||
workload.write_rows(1024, upload=False)
|
||||
workload.write_rows(1024, upload=False)
|
||||
|
||||
def assert_broken():
|
||||
env.pageserver.assert_log_contains(BROKEN_LOG)
|
||||
assert (
|
||||
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total")
|
||||
or 0
|
||||
) == 1
|
||||
assert (
|
||||
env.pageserver.http_client().get_metric_value(
|
||||
"pageserver_circuit_breaker_unbroken_total"
|
||||
)
|
||||
or 0
|
||||
) == 0
|
||||
|
||||
# Wait for enough failures to break the circuit breaker
|
||||
# This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s
|
||||
wait_until(60, 1, assert_broken)
|
||||
|
||||
# Sleep for a while, during which time we expect that compaction will _not_ be retried
|
||||
time.sleep(10)
|
||||
|
||||
assert (
|
||||
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total")
|
||||
or 0
|
||||
) == 1
|
||||
assert (
|
||||
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_unbroken_total")
|
||||
or 0
|
||||
) == 0
|
||||
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
|
||||
|
||||
Reference in New Issue
Block a user