mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
# Problem
Before this PR, `test_pageserver_catchup_while_compute_down` would
occasionally fail due to scary-looking WARN log line
```
WARN ephemeral_file_buffered_writer{...}:flush_attempt{attempt=1}: \
error flushing buffered writer buffer to disk, retrying after backoff err=Operation canceled (os error 125)
```
After lengthy investigation, the conclusion is that this is likely due
to a kernel bug related due to io_uring async workers (io-wq) and
signals.
The main indicator is that the error only ever happens in correlation
with pageserver shtudown when SIGTERM is received.
There is a fix that is merged in 6.14
kernels (`io-wq: backoff when retrying worker creation`).
However, even when I revert that patch, the issue is not reproducible
on 6.14, so, it remains a speculation.
It was ruled out that the ECANCELED is due to the executor thread
exiting before the async worker starts processing the operation.
# Solution
The workaround in this issue is to retry the operation on ECANCELED
once.
Retries are safe because the low-level io_engine operations are
idempotent.
(We don't use O_APPEND and I can't think of another flag that would make
the APIs covered by this patch not idempotent.)
# Testing
With this PR, the warn! log no longer happens on [my reproducer
setup](https://github.com/neondatabase/neon/issues/11446#issuecomment-2843015111).
And the new rate-limited `info!`-level log line informing about the
internal retry shows up instead, as expected.
# Refs
- fixes https://github.com/neondatabase/neon/issues/11446
84 lines
2.0 KiB
Rust
84 lines
2.0 KiB
Rust
//! A helper to rate limit operations.
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
pub struct RateLimit {
|
|
last: Option<Instant>,
|
|
interval: Duration,
|
|
dropped: u64,
|
|
}
|
|
|
|
pub struct RateLimitStats(u64);
|
|
|
|
impl std::fmt::Display for RateLimitStats {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
write!(f, "{} dropped calls", self.0)
|
|
}
|
|
}
|
|
|
|
impl RateLimit {
|
|
pub const fn new(interval: Duration) -> Self {
|
|
Self {
|
|
last: None,
|
|
interval,
|
|
dropped: 0,
|
|
}
|
|
}
|
|
|
|
/// Call `f` if the rate limit allows.
|
|
/// Don't call it otherwise.
|
|
pub fn call<F: FnOnce()>(&mut self, f: F) {
|
|
self.call2(|_| f())
|
|
}
|
|
|
|
pub fn call2<F: FnOnce(RateLimitStats)>(&mut self, f: F) {
|
|
let now = Instant::now();
|
|
match self.last {
|
|
Some(last) if now - last <= self.interval => {
|
|
// ratelimit
|
|
self.dropped += 1;
|
|
}
|
|
_ => {
|
|
self.last = Some(now);
|
|
f(RateLimitStats(self.dropped));
|
|
self.dropped = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::sync::atomic::AtomicUsize;
|
|
|
|
#[test]
|
|
fn basics() {
|
|
use std::sync::atomic::Ordering::Relaxed;
|
|
use std::time::Duration;
|
|
|
|
use super::RateLimit;
|
|
|
|
let called = AtomicUsize::new(0);
|
|
let mut f = RateLimit::new(Duration::from_millis(100));
|
|
|
|
let cl = || {
|
|
called.fetch_add(1, Relaxed);
|
|
};
|
|
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 1);
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 1);
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 1);
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 2);
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 2);
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
f.call(cl);
|
|
assert_eq!(called.load(Relaxed), 3);
|
|
}
|
|
}
|