Compare commits

...

29 Commits

Author SHA1 Message Date
Christian Schwarz
c030745322 WIP 2025-07-20 19:12:00 +00:00
Christian Schwarz
0d2c100048 WIP 2025-07-20 18:51:36 +00:00
Christian Schwarz
ca82b739d3 WIP 2025-07-17 17:01:31 +00:00
Christian Schwarz
c9dbfd737d take two: separate propagation path 2025-07-17 15:03:45 +00:00
Christian Schwarz
5c877c95ec Revert "fetch apply_lsn as part of monitor (can have a separate task at a later point, but this will do)"
This reverts commit c8aab69ef1.
2025-07-17 14:54:23 +00:00
Christian Schwarz
0a14ff3982 Revert "switch lease renewal to use lsn from the state fed by compute monitor"
This reverts commit 9eb60807d8.
2025-07-17 14:53:38 +00:00
Christian Schwarz
a5a9e426cf Revert "make test_hot_standby_gc pass by enabling leases"
This reverts commit bd06bc54b6.
2025-07-17 14:52:55 +00:00
Christian Schwarz
0cba9c00c2 Revert "demo that this solution creates too many leases"
This reverts commit e850cc94b5.
2025-07-17 14:52:48 +00:00
Christian Schwarz
ecd189b5b7 Revert "WIP Leases struct + refactor"
This reverts commit 651a321886.
2025-07-17 14:52:37 +00:00
Christian Schwarz
b2ba489cc6 I think modelling the RO replicas as advancing leases will not work at all because our GC is too dumb; the RO replica's lease will inhibit GC of any layer below
Should have done that homework earlier
2025-07-17 14:50:57 +00:00
Christian Schwarz
651a321886 WIP Leases struct + refactor 2025-07-17 14:50:50 +00:00
Christian Schwarz
f35dc15598 Revert "theorem: https://databricks.slack.com/archives/C09254R641L/p1752739636785699?thread_ts=1752738134.504099&cid=C09254R641L"
This reverts commit e4b514e344.
2025-07-17 08:10:48 +00:00
Christian Schwarz
e4b514e344 theorem: https://databricks.slack.com/archives/C09254R641L/p1752739636785699?thread_ts=1752738134.504099&cid=C09254R641L 2025-07-17 08:10:32 +00:00
Christian Schwarz
e850cc94b5 demo that this solution creates too many leases 2025-07-17 07:14:11 +00:00
Christian Schwarz
bd06bc54b6 make test_hot_standby_gc pass by enabling leases 2025-07-09 23:41:30 +00:00
Christian Schwarz
9eb60807d8 switch lease renewal to use lsn from the state fed by compute monitor 2025-07-09 23:41:30 +00:00
Christian Schwarz
c8aab69ef1 fetch apply_lsn as part of monitor (can have a separate task at a later point, but this will do) 2025-07-09 23:39:17 +00:00
Christian Schwarz
ba4778831b Merge remote-tracking branch 'origin/main' into problame/standby-horizon-removal-poc-rip-out 2025-07-09 11:10:53 +00:00
Christian Schwarz
6e5a83d34d Merge remote-tracking branch 'origin/problame/lease-deadline-tests' into problame/standby-horizon-removal-poc-rip-out 2025-07-04 17:00:51 +02:00
Christian Schwarz
9c8d55c529 rip out timeline_gc(ignore_lease_deadline=...) flag, not needed anymore 2025-07-04 16:55:18 +02:00
Christian Schwarz
2cd1af2979 use lsn_lease_length setting to default Python & rust tests to zero initial lease deadline 2025-07-04 16:50:38 +02:00
Christian Schwarz
c5ef09d0b0 fixup eb267de255 2025-07-04 16:50:24 +02:00
Christian Schwarz
0d01eada52 Merge branch 'problame/lease-deadline-tests' into problame/standby-horizon-removal-poc-rip-out 2025-07-02 16:02:29 +02:00
Christian Schwarz
eb267de255 use initial lease period based on config
This change would make sense as a standalone commit.
2025-07-02 15:58:47 +02:00
Christian Schwarz
8143270c4f fix test_readonly_node_gc, it (among other things) tests the lease deadline 2025-07-02 15:58:38 +02:00
Christian Schwarz
5d499f62cd fix(tests): periodic and immediate gc is effectively a no-op in tests
The introduction of the default lease deadline[^1] feature makes it so
that after PS restart, `.timeline_gc()` calls in Python tests are no-ops
for 10 minute after pageserver startup: the `gc_iteration()` bails with
`Skipping GC because lsn lease deadline is not reached`.

I did some impact analysis in the following PR. About 30 Python tests
are affected:
- https://github.com/neondatabase/neon/pull/12411

Rust tests that don't explicitly enable periodic GC or invoke GC manually
are unaffected because we disable periodic GC by default in
the `TenantHarness`'s tenant config.
Two tests explicitly did `start_paused=true` + `tokio::time::advance()`,
but it would add cognitive and code bloat to each existing and future
test case that uses TenantHarness if we take that route.

So, this PR disables the default lease deadline feature in all tests.

refs
- [^1]: PR that introduced default lease deadline: https://github.com/neondatabase/neon/pull/9055/files
- fixes https://databricks.atlassian.net/browse/LKB-92
2025-07-02 11:56:39 +02:00
Christian Schwarz
3a6094263d i just ran test_hot_standby_feedback and it didn't fail o_O 2025-06-30 21:17:24 +02:00
Christian Schwarz
66ab1047ef undo some more of the changes from https://github.com/neondatabase/neon/pull/7368/files 2025-06-30 21:16:03 +02:00
Christian Schwarz
18cd307461 fully rip it out, see which tests break 2025-06-30 19:13:49 +02:00
25 changed files with 716 additions and 250 deletions

1
Cargo.lock generated
View File

@@ -1348,6 +1348,7 @@ dependencies = [
"p256 0.13.2",
"pageserver_page_api",
"postgres",
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"regex",

View File

@@ -67,6 +67,7 @@ uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -46,7 +46,6 @@ use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::pgbouncer::*;
use crate::rsyslog::{
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
@@ -57,6 +56,7 @@ use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::tls::watch_cert_for_changes;
use crate::{config, extension_server, local_proxy};
use crate::{pg_helpers::*, ro_replica};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
@@ -131,6 +131,8 @@ pub struct ComputeNode {
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub compute_ctl_config: ComputeCtlConfig,
pub(crate) ro_replica: Arc<ro_replica::GlobalState>,
/// Handle to the extension stats collection task
extension_stats_task: TaskHandle,
lfc_offload_task: TaskHandle,
@@ -438,6 +440,7 @@ impl ComputeNode {
compute_ctl_config: config.compute_ctl_config,
extension_stats_task: Mutex::new(None),
lfc_offload_task: Mutex::new(None),
ro_replica: Arc::new(ro_replica::GlobalState::default()),
})
}
@@ -490,6 +493,8 @@ impl ComputeNode {
launch_lsn_lease_bg_task_for_static(&this);
ro_replica::spawn_bg_task(Arc::clone(&this));
// We have a spec, start the compute
let mut delay_exit = false;
let mut vm_monitor = None;

View File

@@ -23,9 +23,11 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub(crate) mod ro_replica;
pub mod rsyslog;
pub mod spec;
mod spec_apply;
pub mod swap;
pub mod sync_sk;
pub mod tls;
pub mod pageserver_client;

View File

@@ -5,15 +5,15 @@ use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverProtocol};
use itertools::Itertools as _;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use utils::id::{TenantId, TimelineId};
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use utils::shard::TenantShardId;
use crate::compute::ComputeNode;
use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect};
/// Spawns a background thread to periodically renew LSN leases for static compute.
/// Do nothing if the compute is not in static mode.
@@ -31,7 +31,7 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
thread::spawn(move || {
let _entered = span.entered();
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
if let Err(e) = lsn_lease_bg_task(compute, timeline_id, lsn) {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
}
@@ -39,14 +39,9 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
}
/// Renews lsn lease periodically so static compute are not affected by GC.
fn lsn_lease_bg_task(
compute: Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<()> {
fn lsn_lease_bg_task(compute: Arc<ComputeNode>, timeline_id: TimelineId, lsn: Lsn) -> Result<()> {
loop {
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
let valid_until = acquire_lsn_lease_with_retry(&compute, timeline_id, lsn)?;
let valid_duration = valid_until
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
@@ -68,7 +63,6 @@ fn lsn_lease_bg_task(
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
fn acquire_lsn_lease_with_retry(
compute: &Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<SystemTime> {
@@ -78,17 +72,13 @@ fn acquire_lsn_lease_with_retry(
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let (connstrings, auth) = {
let shards = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
(
spec.pageserver_connstr.clone(),
spec.storage_auth_token.clone(),
)
let pspec = state.pspec.as_ref().expect("spec must be set");
pageserver_connstrings_for_connect(pspec)
};
let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
let result = try_acquire_lsn_lease(shards, timeline_id, lsn);
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -112,33 +102,32 @@ fn acquire_lsn_lease_with_retry(
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
connstrings: &str,
auth: Option<&str>,
tenant_id: TenantId,
shards: Vec<ConnectInfo>,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut leases = Vec::new();
for (shard_number, &connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
let lease = match PageserverProtocol::from_connstring(connstring)? {
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
for ConnectInfo {
tenant_shard_id,
connstring,
auth,
} in shards
{
let lease = match PageserverProtocol::from_connstring(&connstring)? {
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&connstring,
auth.as_deref(),
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&connstring,
auth.as_deref(),
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
}

View File

@@ -4,9 +4,10 @@ use std::time::Duration;
use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use compute_api::spec::{ComputeFeature, ComputeMode};
use postgres::{Client, NoTls};
use tracing::{Level, error, info, instrument, span};
use utils::lsn::Lsn;
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
@@ -344,6 +345,42 @@ impl ComputeMonitor {
}
}
let mode: ComputeMode = self
.compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.expect("we launch ComputeMonitor only after we received a spec")
.spec
.mode;
match mode {
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
ComputeMode::Primary => (),
ComputeMode::Static(_) => (),
ComputeMode::Replica => {
// TODO: instead of apply_lsn, use min inflight request LSN
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
Ok(apply_lsn) => {
let apply_lsn = Lsn(apply_lsn.into());
self.compute
.ro_replica
.update_min_inflight_request_lsn(apply_lsn);
}
Err(e) => {
anyhow::bail!("parse apply_lsn: {e}");
}
},
Err(e) => {
anyhow::bail!("query apply_lsn: {e}");
}
}
}
}
Ok(())
}
}

View File

@@ -0,0 +1,38 @@
use itertools::Itertools;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use crate::compute::ParsedSpec;
pub struct ConnectInfo {
pub tenant_shard_id: TenantShardId,
pub connstring: String,
pub auth: Option<String>,
}
pub fn pageserver_connstrings_for_connect(pspec: &ParsedSpec) -> Vec<ConnectInfo> {
let connstrings = &pspec.pageserver_connstr;
let auth = pspec.storage_auth_token.clone();
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut infos = Vec::with_capacity(connstrings.len());
for (shard_number, connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(pspec.tenant_id),
shard_count => TenantShardId {
tenant_id: pspec.tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
infos.push(ConnectInfo {
tenant_shard_id,
connstring: connstring.to_string(),
auth: auth.clone(),
});
}
infos
}

View File

@@ -0,0 +1,297 @@
use std::{
pin::Pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use compute_api::spec::PageserverProtocol;
use futures::{StreamExt, stream::FuturesUnordered};
use postgres::SimpleQueryMessage;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, instrument, warn};
use utils::{backoff::retry, id::TimelineId, lsn::Lsn};
use crate::{
compute::ComputeNode,
pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect},
};
#[derive(Default)]
pub(crate) struct GlobalState {
min_inflight_request_lsn: tokio::sync::watch::Sender<Option<Lsn>>,
}
impl GlobalState {
pub fn update_min_inflight_request_lsn(&self, update: Lsn) {
self.min_inflight_request_lsn.send_if_modified(|value| {
let modified = *value != Some(update);
if let Some(value) = *value && value > update {
warn!(current=%value, new=%update, "min inflight request lsn moving backwards, this should not happen, bug in communicator");
}
*value = Some(update);
modified
});
}
}
pub fn spawn_bg_task(compute: Arc<ComputeNode>) {
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(bg_task(compute))
});
}
#[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))]
async fn bg_task(compute: Arc<ComputeNode>) {
// Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation.
let lease_id = format!("v1-{}-{}", compute.params.compute_id, std::process::id());
tracing::Span::current().record("lease_id", tracing::field::display(&lease_id));
// Wait until we have the first value.
// Allows us to simply .unwrap() later because it never transitions back to None.
info!("waiting for first lease lsn to be fetched from postgres");
let mut min_inflight_request_lsn_changed =
compute.ro_replica.min_inflight_request_lsn.subscribe();
min_inflight_request_lsn_changed.mark_changed(); // it could have been set already
min_inflight_request_lsn_changed
.wait_for(|value| value.is_some())
.await
.expect("we never drop the sender");
// React to connstring changes. Sadly there is no async API for this yet.
let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None);
std::thread::spawn({
let compute = Arc::clone(&compute);
move || {
loop {
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::MAX);
let new = compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.and_then(|pspec| pspec.spec.pageserver_connstring.clone());
connstr_watch_tx.send_if_modified(|existing| {
if &new != existing {
*existing = new;
true
} else {
false
}
});
}
}
});
let mut obtained = ObtainedLease {
lsn: Lsn(0),
nearest_expiration: SystemTime::UNIX_EPOCH,
};
loop {
let valid_duration = obtained
.nearest_expiration
.duration_since(SystemTime::now())
.unwrap_or_default();
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
let sleep_duration = valid_duration
.saturating_sub(Duration::from_secs(60))
.max(valid_duration / 2);
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {
info!("updating because lease is going to expire soon");
}
_ = connstr_watch_rx.changed() => {
info!("updating due to changed pageserver_connstr")
}
_ = async {
// debounce; TODO make this lower in tests
tokio::time::sleep(Duration::from_secs(10)).await;
// every 10 GiB; TODO make this tighter in tests?
let max_horizon_lag = 10 * (1<<30);
min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await
} => {
info!(%obtained.lsn, "updating due to max horizon lag");
}
}
// retry forever
let compute = Arc::clone(&compute);
let lease_id = lease_id.clone();
obtained = retry(
|| attempt(lease_id.clone(), &compute),
|_| false,
0,
u32::MAX, // forever
"update standby_horizon position in pageserver",
// There is no cancellation story in compute_ctl
&CancellationToken::new(),
)
.await
.expect("is_permanent returns false, so, retry always returns Some")
.expect("u32::MAX exceeded");
}
}
struct ObtainedLease {
lsn: Lsn,
nearest_expiration: SystemTime,
}
async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result<ObtainedLease> {
let (shards, timeline_id) = {
let state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
(pageserver_connstrings_for_connect(pspec), pspec.timeline_id)
};
let lsn = compute
.ro_replica
.min_inflight_request_lsn
.borrow()
.expect("we only call this function once it has been transitioned to Some");
let mut futs = FuturesUnordered::new();
for connect_info in shards {
let logging_span = info_span!(
"attempt_one",
tenant_id=%connect_info.tenant_shard_id.tenant_id,
shard_id=%connect_info.tenant_shard_id.shard_slug(),
timeline_id=%timeline_id,
);
let logging_wrapper =
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<SystemTime>>>>>| {
async move {
// TODO: timeout?
match fut.await {
Ok(Some(v)) => {
info!("lease obtained");
Ok(Some(v))
}
Ok(None) => {
error!("pageserver rejected our request");
Ok(None)
}
Err(err) => {
error!("communication failure: {err:?}");
Err(())
}
}
}
.instrument(logging_span)
};
let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? {
PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq(
connect_info,
timeline_id,
lease_id.clone(),
lsn,
))),
PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc(
connect_info,
timeline_id,
lease_id.clone(),
lsn,
))),
};
futs.push(fut);
}
let mut errors = 0;
let mut nearest_expiration = None;
while let Some(res) = futs.next().await {
match res {
Ok(Some(expiration)) => {
let nearest_expiration = nearest_expiration.get_or_insert(expiration);
*nearest_expiration = std::cmp::min(*nearest_expiration, expiration);
}
Ok(None) | Err(()) => {
// the logging wrapper does the logging
errors += 1;
}
}
}
if errors > 0 {
return Err(anyhow::anyhow!(
"failed to advance standby_horizon for {errors} shards, check logs for details"
));
}
match nearest_expiration {
Some(nearest_expiration) => Ok(ObtainedLease {
lsn,
nearest_expiration,
}),
None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen
}
}
async fn attempt_one_libpq(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
) -> anyhow::Result<Option<SystemTime>> {
let ConnectInfo {
tenant_shard_id,
connstring,
auth,
} = connect_info;
let mut config = tokio_postgres::Config::from_str(&connstring)?;
if let Some(auth) = auth {
config.password(auth);
}
let (client, conn) = config.connect(postgres::NoTls).await?;
tokio::spawn(conn);
let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} ");
let res = client.simple_query(&cmd).await?;
let msg = match res.first() {
Some(msg) => msg,
None => anyhow::bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => anyhow::bail!("expected row message type"),
};
// Note: this will be None if a lease is explicitly not granted.
let Some(expiration) = row.get("expiration") else {
return Ok(None);
};
let expiration =
SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?));
Ok(expiration)
}
async fn attempt_one_grpc(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
) -> anyhow::Result<Option<SystemTime>> {
let ConnectInfo {
tenant_shard_id,
connstring,
auth,
} = connect_info;
let mut client = pageserver_page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn };
match client.lease_standby_horizon(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
}

View File

@@ -96,50 +96,13 @@ impl HotStandbyFeedback {
}
}
/// Standby status update
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
pub reply_requested: bool,
}
impl StandbyReply {
pub fn empty() -> Self {
StandbyReply {
write_lsn: Lsn::INVALID,
flush_lsn: Lsn::INVALID,
apply_lsn: Lsn::INVALID,
reply_ts: 0,
reply_requested: false,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Receiver is either pageserver or regular standby, which have different
/// feedbacks.
/// Used as both model and internally.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ReplicationFeedback {
Pageserver(PageserverFeedback),
Standby(StandbyFeedback),
Standby(HotStandbyFeedback),
}
/// Uniquely identifies a WAL service connection. Logged in spans for
@@ -266,9 +229,6 @@ pub struct SkTimelineInfo {
pub http_connstr: Option<String>,
#[serde(default)]
pub https_connstr: Option<String>,
// Minimum of all active RO replicas flush LSN
#[serde(default = "lsn_invalid")]
pub standby_horizon: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -70,6 +70,18 @@ service PageService {
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse);
// Upserts a standby_horizon lease. RO replicas rely on this type of lease.
// In slightly more detail: RO replicas always lag to some degree behind the
// primary, and request pages at their respective apply LSN. The standby horizon mechanism
// ensures that the Pageserver does not garbage-collect old page versions in
// the interval between `min(valid standby horizon leases)` and the most recent page version.
//
// Each RO replica call this method continuously as it applies more WAL.
// It identifies its lease through an opaque "lease_id" across these requests.
// The response contains the lease expiration time.
// Status `FailedPrecondition` is returned if the lease cannot be granted.
rpc LeaseStandbyHorizon(LeaseStandbyHorizonRequest) returns (LeaseStandbyHorizonResponse);
}
// The LSN a request should read at.
@@ -272,3 +284,16 @@ message LeaseLsnResponse {
// The lease expiration time.
google.protobuf.Timestamp expires = 1;
}
// Request for LeaseStandbyHorizon rpc.
// The lease_id identifies the lease in subsequent requests.
// The lsn must be monotonic; the request will fail if it is not.
message LeaseStandbyHorizonRequest {
string lease_id = 1;
uint64 lsn = 2;
}
// Response for the success case of LeaseStandbyHorizon rpc.
message LeaseStandbyHorizonResponse {
google.protobuf.Timestamp expiration = 1;
}

View File

@@ -143,6 +143,12 @@ impl Client {
let resp = self.inner.lease_lsn(req).await?.into_inner();
Ok(resp.try_into()?)
}
pub async fn lease_standby_horizon(&mut self, req: LeaseStandbyHorizonRequest) -> tonic::Result<LeaseStandbyHorizonResponse> {
let req = proto::LeaseStandbyHorizonRequest::from(req);
let resp = self.inner.lease_standby_horizon(req).await?.into_inner();
Ok(resp.try_into()?)
}
}
/// Adds authentication metadata to gRPC requests.

View File

@@ -755,3 +755,64 @@ impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
}
}
}
pub struct LeaseStandbyHorizonRequest {
pub lease_id: String,
pub lsn: Lsn,
}
impl TryFrom<proto::LeaseStandbyHorizonRequest> for LeaseStandbyHorizonRequest {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseStandbyHorizonRequest) -> Result<Self, Self::Error> {
if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn"));
}
if pb.lease_id.len() == 0 {
return Err(ProtocolError::Invalid("lease_id", pb.lease_id));
}
Ok(Self {
lease_id: pb.lease_id,
lsn: Lsn(pb.lsn),
})
}
}
impl From<LeaseStandbyHorizonRequest> for proto::LeaseStandbyHorizonRequest {
fn from(request: LeaseStandbyHorizonRequest) -> Self {
Self {
lease_id: request.lease_id,
lsn: request.lsn.0,
}
}
}
/// Lease expiration time. If the lease could not be granted because the LSN has already been
/// garbage collected, a FailedPrecondition status will be returned instead.
pub type LeaseStandbyHorizonResponse = SystemTime;
impl TryFrom<proto::LeaseStandbyHorizonResponse> for LeaseStandbyHorizonResponse {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result<Self, Self::Error> {
let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?;
UNIX_EPOCH
.checked_add(Duration::new(
expiration.seconds as u64,
expiration.nanos as u32,
))
.ok_or_else(|| ProtocolError::invalid("expiration", expiration))
}
}
impl From<LeaseStandbyHorizonResponse> for proto::LeaseStandbyHorizonResponse {
fn from(response: LeaseStandbyHorizonResponse) -> Self {
let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default();
Self {
expiration: Some(prost_types::Timestamp {
seconds: expiration.as_secs() as i64,
nanos: expiration.subsec_nanos() as i32,
}),
}
}
}

View File

@@ -720,15 +720,6 @@ static TIMELINE_ARCHIVE_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_standby_horizon",
"Standby apply LSN for which GC is hold off, by timeline.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_resident_physical_size",
@@ -2327,6 +2318,7 @@ pub(crate) enum ComputeCommandKind {
Basebackup,
Fullbackup,
LeaseLsn,
LeaseStandbyHorizon,
}
pub(crate) struct ComputeCommandCounters {
@@ -3228,7 +3220,6 @@ pub(crate) struct TimelineMetrics {
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub layers_per_read: Histogram,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
@@ -3330,9 +3321,6 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let standby_horizon_gauge = STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -3416,7 +3404,6 @@ impl TimelineMetrics {
pitr_history_size,
archival_size,
layers_per_read,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
@@ -3559,7 +3546,6 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);

View File

@@ -76,6 +76,7 @@ use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
debug_assert_current_span_has_tenant_id,
};
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
use crate::tenant::mgr::{
@@ -2218,7 +2219,7 @@ impl PageServerHandler {
valid_until_str.as_deref().unwrap_or("<unknown>")
);
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
let bytes: Option<&[u8]> = valid_until_str.as_ref().map(|x| x.as_bytes());
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"valid_until",
@@ -2228,6 +2229,56 @@ impl PageServerHandler {
Ok(())
}
#[instrument(skip_all, fields(shard_id, %lsn))]
async fn handle_lease_standby_horizon<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_id();
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(
tenant_shard_id.tenant_id,
timeline_id,
ShardSelector::Known(tenant_shard_id.to_index()),
)
.await?;
set_tracing_field_shard_id(&timeline);
let result: Option<SystemTime> = timeline
.lease_standby_horizon(lease_id, lsn, ctx)
.inspect_err(|e| {
warn!("{e}");
})
.ok();
// Encode result as Option<millis since epoch>
let bytes = result.map(|t| {
t.duration_since(SystemTime::UNIX_EPOCH)
.expect("we wouldn't allow a lease at epoch, system time would be horribly off")
.as_millis()
.to_string()
.into_bytes()
});
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"expiration",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?;
Ok(())
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
timeline: &Timeline,
@@ -2718,6 +2769,14 @@ struct LeaseLsnCmd {
lsn: Lsn,
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct LeaseStandbyHorizonCmd {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum PageServiceCmd {
Set,
@@ -2725,6 +2784,7 @@ enum PageServiceCmd {
BaseBackup(BaseBackupCmd),
FullBackup(FullBackupCmd),
LeaseLsn(LeaseLsnCmd),
LeaseStandbyHorizon(LeaseStandbyHorizonCmd),
}
impl PageStreamCmd {
@@ -2874,6 +2934,31 @@ impl LeaseLsnCmd {
}
}
impl LeaseStandbyHorizonCmd {
fn parse(query: &str) -> anyhow::Result<Self> {
let parameters = query.split_whitespace().collect_vec();
if parameters.len() != 4 {
bail!(
"invalid number of parameters for lease lsn command: {}",
query
);
}
let tenant_shard_id = TenantShardId::from_str(parameters[0])
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
let timeline_id = TimelineId::from_str(parameters[1])
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
let lease_id = parameters[2].to_string();
let standby_horizon = Lsn::from_str(parameters[3])
.with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
Ok(Self {
tenant_shard_id,
timeline_id,
lease_id,
lsn: standby_horizon,
})
}
}
impl PageServiceCmd {
fn parse(query: &str) -> anyhow::Result<Self> {
let query = query.trim();
@@ -2898,6 +2983,10 @@ impl PageServiceCmd {
let cmd2 = cmd2.to_ascii_lowercase();
if cmd2 == "lsn" {
Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
} else if cmd2 == "standby_horizon" {
Ok(Self::LeaseStandbyHorizon(LeaseStandbyHorizonCmd::parse(
other,
)?))
} else {
bail!("invalid lease command: {cmd}");
}
@@ -3161,6 +3250,45 @@ where
}
};
}
PageServiceCmd::LeaseStandbyHorizon(LeaseStandbyHorizonCmd {
tenant_shard_id,
timeline_id,
lease_id,
lsn,
}) => {
tracing::Span::current()
.record("tenant_id", field::display(tenant_shard_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_shard_id.tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::LeaseStandbyHorizon)
.inc();
match self
.handle_lease_standby_horizon(
pgb,
tenant_shard_id,
timeline_id,
lease_id,
lsn,
&ctx,
)
.await
{
Ok(()) => {
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
}
Err(e) => {
error!("error obtaining standby_horizon lease for {lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
}
};
}
}
Ok(())
@@ -3801,6 +3929,28 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(expires.into()))
}
#[instrument(skip_all, fields(lease_id, lsn))]
async fn lease_standby_horizon(
&self,
req: tonic::Request<proto::LeaseStandbyHorizonRequest>,
) -> Result<tonic::Response<proto::LeaseStandbyHorizonResponse>, tonic::Status> {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate and convert the request, and decorate the span.
let page_api::LeaseStandbyHorizonRequest { lease_id, lsn } = req.into_inner().try_into()?;
span_record!(lease_id=%lease_id, lsn=%lsn);
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) {
Ok(expiration) => expiration,
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))),
};
Ok(tonic::Response::new(expiration.into()))
}
}
/// gRPC middleware layer that handles observability concerns:

View File

@@ -248,8 +248,6 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
pub(crate) standby_horizon: AtomicLsn,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
@@ -1860,6 +1858,15 @@ impl Timeline {
Ok(lease)
}
pub(crate) fn lease_standby_horizon(
&self,
lease_id: String,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<SystemTime> {
todo!()
}
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
/// Returns the flush request ID which can be awaited with wait_flush_completion().
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
@@ -3142,8 +3149,6 @@ impl Timeline {
l0_compaction_trigger: resources.l0_compaction_trigger,
gc_lock: tokio::sync::Mutex::default(),
standby_horizon: AtomicLsn::new(0),
pagestream_throttle: resources.pagestream_throttle,
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
@@ -6521,32 +6526,7 @@ impl Timeline {
)
};
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
if standby_horizon != Lsn::INVALID {
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
trace!("holding off GC for standby apply LSN {}", standby_horizon);
} else {
warn!(
"standby is lagging for more than {}MB, not holding gc for it",
MAX_ALLOWED_STANDBY_LAG / 1024 / 1024
)
}
}
}
// Reset standby horizon to ignore it if it is not updated till next GC.
// It is an easy way to unset it when standby disappears without adding
// more conf options.
self.standby_horizon.store(Lsn::INVALID);
self.metrics
.standby_horizon_gauge
.set(Lsn::INVALID.0 as i64);
let new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
let res = self
.gc_timeline(

View File

@@ -2966,7 +2966,7 @@ impl Timeline {
}
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff).
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
/// here.
pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
@@ -2975,9 +2975,6 @@ impl Timeline {
gc_info.min_cutoff()
};
// TODO: standby horizon should use leases so we don't really need to consider it here.
// let watermark = watermark.min(self.standby_horizon.load());
// TODO: ensure the child branches will not use anything below the watermark, or consider
// them when computing the watermark.
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())

View File

@@ -729,7 +729,6 @@ impl ConnectionManagerState {
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
@@ -750,21 +749,6 @@ impl ConnectionManagerState {
WALRECEIVER_BROKER_UPDATES.inc();
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
);
if timeline_update.standby_horizon != 0 {
// ignore reports from safekeepers not connected to replicas
self.timeline
.standby_horizon
.store(Lsn(timeline_update.standby_horizon));
self.timeline
.metrics
.standby_horizon_gauge
.set(timeline_update.standby_horizon as i64);
}
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
@@ -1148,7 +1132,6 @@ mod tests {
commit_lsn,
safekeeper_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
standby_horizon: 0,
},
latest_update,
}

View File

@@ -199,7 +199,6 @@ async fn discover_loop(
commit_lsn: sk_info.commit_lsn,
safekeeper_connstr: sk_info.safekeeper_connstr,
availability_zone: sk_info.availability_zone,
standby_horizon: 0,
};
// note this is a blocking call

View File

@@ -554,7 +554,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
standby_horizon: sk_info.standby_horizon.0,
};
let global_timelines = get_global_timelines(&request);

View File

@@ -16,8 +16,7 @@ use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_times
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use safekeeper_api::Term;
use safekeeper_api::models::{
HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, StandbyFeedback,
StandbyReply,
HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::watch::Receiver;
@@ -198,8 +197,8 @@ impl WalSenders {
}
/// Get aggregated hot standby feedback (we send it to compute).
pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
self.mutex.lock().agg_standby_feedback
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
self.mutex.lock().agg_hs_feedback
}
/// Record new pageserver feedback, update aggregated values.
@@ -216,36 +215,14 @@ impl WalSenders {
self.walreceivers.broadcast_pageserver_feedback(*feedback);
}
/// Record standby reply.
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
match &mut slot.get_mut_feedback() {
ReplicationFeedback::Standby(sf) => sf.reply = *reply,
ReplicationFeedback::Pageserver(_) => {
*slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
reply: *reply,
hs_feedback: HotStandbyFeedback::empty(),
})
}
}
}
/// Record hot standby feedback, update aggregated value.
fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
match &mut slot.get_mut_feedback() {
ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
ReplicationFeedback::Standby(sf) => *sf = *feedback,
ReplicationFeedback::Pageserver(_) => {
*slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: *feedback,
})
*slot.get_mut_feedback() = ReplicationFeedback::Standby(*feedback);
}
}
shared.update_reply_feedback();
@@ -272,7 +249,7 @@ impl WalSenders {
struct WalSendersShared {
// aggregated over all walsenders value
agg_standby_feedback: StandbyFeedback,
agg_hs_feedback: HotStandbyFeedback,
// last feedback ever received from any pageserver, empty if none
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
@@ -324,7 +301,7 @@ impl WalSenderState {
impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_standby_feedback: StandbyFeedback::empty(),
agg_hs_feedback: HotStandbyFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
slots: Vec::new(),
@@ -341,14 +318,12 @@ impl WalSendersShared {
self.slots[id].as_mut().expect("walsender doesn't exist")
}
/// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
/// Update aggregated hot standy feedbacks. We just take min of valid xmins
/// and ts.
fn update_reply_feedback(&mut self) {
let mut agg = HotStandbyFeedback::empty();
let mut reply_agg = StandbyReply::empty();
for ws_state in self.slots.iter().flatten() {
if let ReplicationFeedback::Standby(standby_feedback) = ws_state.get_feedback() {
let hs_feedback = standby_feedback.hs_feedback;
if let ReplicationFeedback::Standby(hs_feedback) = ws_state.get_feedback() {
// doing Option math like op1.iter().chain(op2.iter()).min()
// would be nicer, but we serialize/deserialize this struct
// directly, so leave as is for now
@@ -368,41 +343,9 @@ impl WalSendersShared {
}
agg.ts = max(agg.ts, hs_feedback.ts);
}
let reply = standby_feedback.reply;
if reply.write_lsn != Lsn::INVALID {
if reply_agg.write_lsn != Lsn::INVALID {
reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
} else {
reply_agg.write_lsn = reply.write_lsn;
}
}
if reply.flush_lsn != Lsn::INVALID {
if reply_agg.flush_lsn != Lsn::INVALID {
reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
} else {
reply_agg.flush_lsn = reply.flush_lsn;
}
}
if reply.apply_lsn != Lsn::INVALID {
if reply_agg.apply_lsn != Lsn::INVALID {
reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
} else {
reply_agg.apply_lsn = reply.apply_lsn;
}
}
if reply.reply_ts != 0 {
if reply_agg.reply_ts != 0 {
reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
} else {
reply_agg.reply_ts = reply.reply_ts;
}
}
}
}
self.agg_standby_feedback = StandbyFeedback {
reply: reply_agg,
hs_feedback: agg,
};
self.agg_hs_feedback = agg;
}
}
@@ -1006,11 +949,30 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.record_hs_feedback(self.ws_guard.id, &hs_feedback);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply =
StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
self.ws_guard
.walsenders
.record_standby_reply(self.ws_guard.id, &reply);
// Earlier version sof the software did things with the standby reply.
// Current code is only interested in the hot standby data.
// For posterity, and potential future use, still maintain the code to parse it.
if cfg!(feature = "testing") {
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
struct StandbyReply {
write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
reply_requested: bool,
}
match StandbyReply::des(&msg[1..]) {
Ok(reply) => {
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
}
Err(e) => {
debug!("error deserializing standby reply: {e:?}");
}
}
}
}
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
// pageserver sends this.
@@ -1037,6 +999,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
#[cfg(test)]
mod tests {
use postgres_ffi::TimestampTz;
use safekeeper_api::models::FullTransactionId;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
@@ -1068,13 +1031,10 @@ mod tests {
// form standby feedback with given hot standby feedback ts/xmin and the
// rest set to dummy values.
fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
ReplicationFeedback::Standby(StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback {
ts,
xmin,
catalog_xmin: 0,
},
ReplicationFeedback::Standby(HotStandbyFeedback {
ts,
xmin,
catalog_xmin: 0,
})
}
@@ -1084,10 +1044,7 @@ mod tests {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
wss.update_reply_feedback();
assert_eq!(
wss.agg_standby_feedback.hs_feedback.xmin,
INVALID_FULL_TRANSACTION_ID
);
assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
}
#[test]
@@ -1097,6 +1054,6 @@ mod tests {
push_feedback(&mut wss, hs_feedback(1, 42));
push_feedback(&mut wss, hs_feedback(1, 64));
wss.update_reply_feedback();
assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
assert_eq!(wss.agg_hs_feedback.xmin, 42);
}
}

View File

@@ -355,7 +355,6 @@ impl SharedState {
&self,
ttid: &TenantTimelineId,
conf: &SafeKeeperConf,
standby_apply_lsn: Lsn,
) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
safekeeper_id: conf.my_id.0,
@@ -379,7 +378,6 @@ impl SharedState {
backup_lsn: self.sk.state().inmem.backup_lsn.0,
local_start_lsn: self.sk.state().local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
standby_horizon: standby_apply_lsn.0,
}
}
@@ -789,9 +787,8 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
let shared_state = self.read_shared_state().await;
shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
shared_state.get_safekeeper_info(&self.ttid, conf)
}
/// Update timeline state with peer safekeeper data.
@@ -1108,7 +1105,7 @@ impl WalResidentTimeline {
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
resp.hs_feedback = self.walsenders.get_hotstandby();
}
}
Ok(rmsg)

View File

@@ -154,7 +154,6 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,
};
counter += 1;
yield info;

View File

@@ -25,6 +25,7 @@ message SubscribeSafekeeperInfoRequest {
}
}
// Next ID: 16
message SafekeeperTimelineInfo {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
@@ -42,7 +43,6 @@ message SafekeeperTimelineInfo {
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_horizon = 14;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string.
@@ -99,6 +99,7 @@ message SafekeeperDiscoveryRequest {
}
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
// Next ID: 7
message SafekeeperDiscoveryResponse {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
@@ -108,6 +109,4 @@ message SafekeeperDiscoveryResponse {
string safekeeper_connstr = 4;
// Availability zone of a safekeeper.
optional string availability_zone = 5;
// Replica apply LSN
uint64 standby_horizon = 6;
}

View File

@@ -856,7 +856,6 @@ mod tests {
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,
})
}

View File

@@ -168,7 +168,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_last_record_lsn",
"pageserver_disk_consistent_lsn",
"pageserver_projected_remote_consistent_lsn",
"pageserver_standby_horizon",
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
"pageserver_smgr_query_seconds_sum",