Compare commits

..

4 Commits

Author SHA1 Message Date
John Spray
1d18b74324 storcon: add rate limiting for proxied API requests 2025-02-14 00:10:43 +01:00
Alexander Bayandin
3e8bf2159d CI(build-and-test): run benchmarks after deploy job (#10791)
## Problem

`benchmarks` is a long-running and non-blocking job. If, on Staging, a
deploy-blocking job fails, restarting it requires cancelling any running
`benchmarks` jobs, which is a waste of CI resources and requires a
couple of extra clicks for a human to do.

Ref: https://neondb.slack.com/archives/C059ZC138NR/p1739292995400899

## Summary of changes
- Run `benchmarks` after `deploy` job
- Handle `benchmarks` run in PRs with `run-benchmarks` label but without
`deploy` job.
2025-02-13 22:03:47 +00:00
Arpad Müller
5008324460 Fix utilization URL and ensure heartbeats work (#10811)
There was a typo in the name of the utilization endpoint URL, fix it.
Also, ensure that the heartbeat mechanism actually works.

Related: #10583, #10429

Part of #9011
2025-02-13 20:55:53 +00:00
Christian Schwarz
487f3202fe pageserver read path: abort on fatal IO errors from disk / filesystem (#10786)
Before this PR, an IO error returned from the kernel, e.g., due to a bad
disk, would get bubbled up, all the way to a user-visible query failing.

This is against the IO error handling policy where we have established
and is hence being rectified in this PR.
[[(internal Policy document
link)]](bef44149f7/src/storage/handling_io_and_logical_errors.md (L33-L35))

The practice on the write path seems to be that we call
`maybe_fatal_err()` or `fatal_err()` fairly high up the stack.
That is, regardless of whether std::fs, tokio::fs, or VirtualFile is
used to perform the IO.

For the read path, I choose a centralized approach in this PR by
checking for errors as close to the kernel interface as possible.
I believe this is better for long-term consistency.

To mitigate the problem of missing context if we abort so far down in
the stack, the `on_fatal_io_error` now captures and logs a backtrace.

I grepped the pageserver code base for `fs::read` to convince myself
that all non-VirtualFile reads already handle IO errors according to
policy.

Refs

- fixes https://github.com/neondatabase/neon/issues/10454
2025-02-13 20:53:39 +00:00
9 changed files with 143 additions and 89 deletions

View File

@@ -263,8 +263,9 @@ jobs:
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
benchmarks:
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write

77
Cargo.lock generated
View File

@@ -2398,9 +2398,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.2"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
@@ -2503,6 +2503,27 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
dependencies = [
"cfg-if",
"dashmap 6.1.0",
"futures-sink",
"futures-timer",
"futures-util",
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"portable-atomic",
"quanta",
"rand 0.8.5",
"smallvec",
"spinning_top",
]
[[package]]
name = "group"
version = "0.12.1"
@@ -3702,6 +3723,12 @@ dependencies = [
"memoffset 0.9.0",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@@ -3712,6 +3739,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "notify"
version = "8.0.0"
@@ -4570,6 +4603,12 @@ dependencies = [
"never-say-never",
]
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "postgres"
version = "0.19.7"
@@ -5036,6 +5075,21 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "quanta"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
@@ -5166,6 +5220,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "raw-cpuid"
version = "11.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -6374,6 +6437,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -6449,6 +6521,7 @@ dependencies = [
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
"http-utils",
"humantime",

View File

@@ -283,53 +283,6 @@ pub(crate) enum SpawnMode {
Lazy,
}
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
///
/// It is used to notify the compaction loop that there is work to be done. It is also used
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
/// on other timelines.
pub struct CompactionNotifier {
notify: Notify,
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
}
impl CompactionNotifier {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
notify: Notify::new(),
l0_count: std::sync::Mutex::new(HashMap::new()),
}
}
pub fn notify_one(&self) {
self.notify.notify_one();
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
let mut guard = self.l0_count.lock().unwrap();
guard.insert(timeline_id, l0_count);
}
pub fn on_shutdown(&self, timeline_id: TimelineId) {
let mut guard = self.l0_count.lock().unwrap();
guard.remove(&timeline_id);
}
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
let guard = self.l0_count.lock().unwrap();
guard
.iter()
.max_by_key(|(_, count)| *count)
.map(|(timeline_id, count)| (*count, *timeline_id))
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -405,7 +358,7 @@ pub struct Tenant {
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
pub(crate) l0_compaction_trigger: Arc<Notify>,
/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
@@ -4289,7 +4242,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
l0_compaction_trigger: Arc::new(Notify::new()),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),

View File

@@ -47,7 +47,7 @@ use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, watch};
use tokio::sync::{oneshot, watch, Notify};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::critical;
@@ -151,8 +151,7 @@ use super::{
MaybeOffloaded,
};
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
HeatMapTimeline,
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -225,7 +224,7 @@ pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<CompactionNotifier>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -429,7 +428,7 @@ pub struct Timeline {
compaction_failed: AtomicBool,
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
l0_compaction_trigger: Arc<CompactionNotifier>,
l0_compaction_trigger: Arc<Notify>,
/// Make sure we only have one running gc at a time.
///
@@ -1966,8 +1965,6 @@ impl Timeline {
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
@@ -4122,8 +4119,6 @@ impl Timeline {
if l0_count >= self.get_compaction_threshold() {
self.l0_compaction_trigger.notify_one();
}
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
@@ -5072,22 +5067,20 @@ impl Timeline {
// image layer generation taking too long time and blocking L0 compaction. So in this
// mode, we also inspect the current number of L0 layers and skip image layer generation
// if there are too many of them.
if let Some((max_num_of_l0_layers, timeline_id)) =
self.l0_compaction_trigger.get_max_l0_count()
{
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0
&& max_num_of_l0_layers >= image_preempt_threshold
{
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
let num_of_l0_layers = {
let layers = self.layers.read().await;
layers.layer_map()?.level0_deltas().len()
};
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
}
}
@@ -5466,13 +5459,8 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
let l0_count = guard.layer_map()?.level0_deltas().len();
drop_wlock(guard);
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
Ok(())
}

View File

@@ -496,7 +496,8 @@ pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
tracing::error!("Fatal I/O error: {e}: {context})");
let backtrace = std::backtrace::Backtrace::force_capture();
tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
std::process::abort();
}
@@ -947,13 +948,18 @@ impl VirtualFileInner {
where
Buf: tokio_epoll_uring::IoBufMut + Send,
{
let file_guard = match self.lock_file().await {
let file_guard = match self
.lock_file()
.await
.maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
{
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Read, {
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
if let Ok(size) = res {
STORAGE_IO_SIZE
.with_label_values(&[

View File

@@ -626,7 +626,7 @@ pub fn make_router(
failpoints_handler(r, cancel).await
})
})
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
.get("/v1/utilization", |r| request_span(r, utilization_handler))
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})

View File

@@ -26,6 +26,7 @@ humantime.workspace = true
itertools.workspace = true
lasso.workspace = true
once_cell.workspace = true
governor = {version = "0.8.0"}
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true

View File

@@ -8,6 +8,7 @@ use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use governor::{Quota, RateLimiter};
use http_utils::{
endpoint::{self, auth_middleware, check_permission_with, request_span},
error::ApiError,
@@ -32,6 +33,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::{mgmt_api, BlockUnblock};
use std::num::NonZero;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -516,6 +518,14 @@ async fn handle_tenant_timeline_block_unblock_gc(
json_response(StatusCode::OK, ())
}
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
RateLimiter<
TenantId,
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
governor::clock::DefaultClock,
>,
> = std::sync::OnceLock::new();
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
@@ -537,6 +547,19 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
// Proxied requests are expected to be rare on a per-tenant basis: these are things
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
// that has high throughput.
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
RateLimiter::new(
Quota::per_second(NonZero::new(10).unwrap()),
governor::state::keyed::DefaultKeyedStateStore::new(),
governor::clock::DefaultClock::default(),
)
});
limiter.until_key_ready(&tenant_id).await;
// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;

View File

@@ -3189,15 +3189,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert len(target.get_safekeepers()) == 0
sk_0 = env.safekeepers[0]
body = {
"active": True,
"id": fake_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "safekeeper-333.us-east-2.aws.neon.build",
"port": 6401,
"http_port": 7676,
"host": "localhost",
"port": sk_0.port.pg,
"http_port": sk_0.port.http,
"version": 5957,
"availability_zone_id": "us-east-2b",
}
@@ -3243,6 +3245,13 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
def storcon_heartbeat():
assert env.storage_controller.log_contains(
"Heartbeat round complete for 1 safekeepers, 0 offline"
)
wait_until(storcon_heartbeat)
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]