mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
ingest: rate-limited warning if WAL commit timestamps lags for > wait_lsn_timeout (#8839)
refs https://github.com/neondatabase/cloud/issues/13750 The logging in this commit will make it easier to detect lagging ingest. We're trusting compute timestamps --- ideally we'd use SK timestmaps instead. But trusting the compute timestamp is ok for now.
This commit is contained in:
committed by
GitHub
parent
cfa45ff5ee
commit
c2f8fdccd7
@@ -136,9 +136,9 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
|
||||
// Export some version independent functions that are used outside of this mod
|
||||
pub use v14::xlog_utils::encode_logical_message;
|
||||
pub use v14::xlog_utils::from_pg_timestamp;
|
||||
pub use v14::xlog_utils::get_current_timestamp;
|
||||
pub use v14::xlog_utils::to_pg_timestamp;
|
||||
pub use v14::xlog_utils::try_from_pg_timestamp;
|
||||
pub use v14::xlog_utils::XLogFileName;
|
||||
|
||||
pub use v14::bindings::DBState_DB_SHUTDOWNED;
|
||||
|
||||
@@ -135,6 +135,8 @@ pub fn get_current_timestamp() -> TimestampTz {
|
||||
mod timestamp_conversions {
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use super::*;
|
||||
|
||||
const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
|
||||
@@ -154,18 +156,18 @@ mod timestamp_conversions {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
|
||||
pub fn try_from_pg_timestamp(time: TimestampTz) -> anyhow::Result<SystemTime> {
|
||||
let time: u64 = time
|
||||
.try_into()
|
||||
.expect("timestamp before millenium (postgres epoch)");
|
||||
.context("timestamp before millenium (postgres epoch)")?;
|
||||
let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
|
||||
SystemTime::UNIX_EPOCH
|
||||
.checked_add(Duration::from_micros(since_unix_epoch))
|
||||
.expect("SystemTime overflow")
|
||||
.context("SystemTime overflow")
|
||||
}
|
||||
}
|
||||
|
||||
pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};
|
||||
pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
|
||||
|
||||
// Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
|
||||
// start_lsn must point to some previously known record boundary (beginning of
|
||||
@@ -545,14 +547,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_ts_conversion() {
|
||||
let now = SystemTime::now();
|
||||
let round_trip = from_pg_timestamp(to_pg_timestamp(now));
|
||||
let round_trip = try_from_pg_timestamp(to_pg_timestamp(now)).unwrap();
|
||||
|
||||
let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
|
||||
let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
|
||||
assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
|
||||
|
||||
let now_pg = get_current_timestamp();
|
||||
let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));
|
||||
let round_trip_pg = to_pg_timestamp(try_from_pg_timestamp(now_pg).unwrap());
|
||||
|
||||
assert_eq!(now_pg, round_trip_pg);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,15 @@ use std::time::{Duration, Instant};
|
||||
pub struct RateLimit {
|
||||
last: Option<Instant>,
|
||||
interval: Duration,
|
||||
dropped: u64,
|
||||
}
|
||||
|
||||
pub struct RateLimitStats(u64);
|
||||
|
||||
impl std::fmt::Display for RateLimitStats {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{} dropped calls", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
@@ -12,20 +21,27 @@ impl RateLimit {
|
||||
Self {
|
||||
last: None,
|
||||
interval,
|
||||
dropped: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Call `f` if the rate limit allows.
|
||||
/// Don't call it otherwise.
|
||||
pub fn call<F: FnOnce()>(&mut self, f: F) {
|
||||
self.call2(|_| f())
|
||||
}
|
||||
|
||||
pub fn call2<F: FnOnce(RateLimitStats)>(&mut self, f: F) {
|
||||
let now = Instant::now();
|
||||
match self.last {
|
||||
Some(last) if now - last <= self.interval => {
|
||||
// ratelimit
|
||||
self.dropped += 1;
|
||||
}
|
||||
_ => {
|
||||
self.last = Some(now);
|
||||
f();
|
||||
f(RateLimitStats(self.dropped));
|
||||
self.dropped = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -871,7 +871,10 @@ async fn get_timestamp_of_lsn_handler(
|
||||
|
||||
match result {
|
||||
Some(time) => {
|
||||
let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
|
||||
let time = format_rfc3339(
|
||||
postgres_ffi::try_from_pg_timestamp(time).map_err(ApiError::InternalServerError)?,
|
||||
)
|
||||
.to_string();
|
||||
json_response(StatusCode::OK, time)
|
||||
}
|
||||
None => Err(ApiError::NotFound(
|
||||
|
||||
@@ -218,7 +218,7 @@ pub(crate) struct RelSizeCache {
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
pub(crate) conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
|
||||
myself: Weak<Self>,
|
||||
|
||||
@@ -21,19 +21,25 @@
|
||||
//! redo Postgres process, but some records it can handle directly with
|
||||
//! bespoken Rust code.
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord::*;
|
||||
@@ -53,6 +59,13 @@ pub struct WalIngest {
|
||||
shard: ShardIdentity,
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
warn_ingest_lag: WarnIngestLag,
|
||||
}
|
||||
|
||||
struct WarnIngestLag {
|
||||
lag_msg_ratelimit: RateLimit,
|
||||
future_lsn_msg_ratelimit: RateLimit,
|
||||
timestamp_invalid_msg_ratelimit: RateLimit,
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
@@ -71,6 +84,11 @@ impl WalIngest {
|
||||
shard: *timeline.get_shard_identity(),
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
warn_ingest_lag: WarnIngestLag {
|
||||
lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
||||
future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
||||
timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1212,6 +1230,48 @@ impl WalIngest {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn warn_on_ingest_lag(
|
||||
&mut self,
|
||||
conf: &crate::config::PageServerConf,
|
||||
wal_timestmap: TimestampTz,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let now = SystemTime::now();
|
||||
let rate_limits = &mut self.warn_ingest_lag;
|
||||
match try_from_pg_timestamp(wal_timestmap) {
|
||||
Ok(ts) => {
|
||||
match now.duration_since(ts) {
|
||||
Ok(lag) => {
|
||||
if lag > conf.wait_lsn_timeout {
|
||||
rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
|
||||
let lag = humantime::format_duration(lag);
|
||||
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
|
||||
})
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let delta_t = e.duration();
|
||||
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
|
||||
// => https://www.robustperception.io/time-metric-from-the-node-exporter/
|
||||
const IGNORED_DRIFT: Duration = Duration::from_millis(100);
|
||||
if delta_t > IGNORED_DRIFT {
|
||||
let delta_t = humantime::format_duration(delta_t);
|
||||
rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
|
||||
warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
Err(error) => {
|
||||
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
|
||||
warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
|
||||
///
|
||||
async fn ingest_xact_record(
|
||||
@@ -1228,6 +1288,8 @@ impl WalIngest {
|
||||
let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
|
||||
|
||||
self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
|
||||
|
||||
for subxact in &parsed.subxacts {
|
||||
let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
if subxact_pageno != pageno {
|
||||
@@ -2303,6 +2365,9 @@ mod tests {
|
||||
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
|
||||
let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
|
||||
let span = harness
|
||||
.span()
|
||||
.in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let remote_initdb_path =
|
||||
@@ -2354,6 +2419,7 @@ mod tests {
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ def test_backward_compatibility(
|
||||
try:
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
|
||||
# check_neon_works does recovery from WAL => the compatibility snapshot's WAL is old => will log this warning
|
||||
ingest_lag_log_line = (
|
||||
".*ingesting record with timestamp lagging more than wait_lsn_timeout.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(ingest_lag_log_line)
|
||||
neon_env_builder.start()
|
||||
|
||||
check_neon_works(
|
||||
@@ -181,6 +186,9 @@ def test_backward_compatibility(
|
||||
sql_dump_path=compatibility_snapshot_dir / "dump.sql",
|
||||
repo_dir=env.repo_dir,
|
||||
)
|
||||
|
||||
env.pageserver.assert_log_contains(ingest_lag_log_line)
|
||||
|
||||
except Exception:
|
||||
if breaking_changes_allowed:
|
||||
pytest.xfail(
|
||||
|
||||
@@ -62,6 +62,12 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
|
||||
elements_to_insert = 1_000_000
|
||||
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
|
||||
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
|
||||
# we configure wait_lsn_timeout to a shorter value than the lagging_wal_timeout / walreceiver_connect_timeout
|
||||
# => after we run into a timeout and reconnect to a different SK, more time than wait_lsn_timeout has passed
|
||||
# ==> we log this error
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ingesting record with timestamp lagging more than wait_lsn_timeout.*"
|
||||
)
|
||||
|
||||
insert_test_elements(env, tenant_id, start=0, count=elements_to_insert)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user