mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
switch lease renewal to use lsn from the state fed by compute monitor
This commit is contained in:
@@ -43,7 +43,7 @@ use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::{self, launch_lsn_lease_bg_task_for_static};
|
||||
use crate::lsn_lease::{self, launch_lsn_lease_bg_task};
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -491,7 +491,7 @@ impl ComputeNode {
|
||||
this.wait_spec()?
|
||||
};
|
||||
|
||||
launch_lsn_lease_bg_task_for_static(&this);
|
||||
launch_lsn_lease_bg_task(&this);
|
||||
|
||||
// We have a spec, start the compute
|
||||
let mut delay_exit = false;
|
||||
|
||||
@@ -4,10 +4,11 @@ use std::thread;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use compute_api::spec::{ComputeMode, PageserverProtocol};
|
||||
use compute_api::spec::PageserverProtocol;
|
||||
use itertools::Itertools as _;
|
||||
use pageserver_page_api as page_api;
|
||||
use postgres::{NoTls, SimpleQueryMessage};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -33,23 +34,19 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a background thread to periodically renew LSN leases for static compute.
|
||||
/// Do nothing if the compute is not in static mode.
|
||||
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
let (tenant_id, timeline_id, lsn) = {
|
||||
/// Spawns a background thread to periodically renew LSN leases.
|
||||
pub fn launch_lsn_lease_bg_task(compute: &Arc<ComputeNode>) {
|
||||
let (tenant_id, timeline_id) = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let spec = state.pspec.as_ref().expect("Spec must be set");
|
||||
match spec.spec.mode {
|
||||
ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
|
||||
_ => return,
|
||||
}
|
||||
(spec.tenant_id, spec.timeline_id)
|
||||
};
|
||||
let compute = compute.clone();
|
||||
|
||||
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
|
||||
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id);
|
||||
thread::spawn(move || {
|
||||
let _entered = span.entered();
|
||||
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
|
||||
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id) {
|
||||
// TODO: might need stronger error feedback than logging an warning.
|
||||
warn!("Exited with error: {e}");
|
||||
}
|
||||
@@ -61,10 +58,22 @@ fn lsn_lease_bg_task(
|
||||
compute: Arc<ComputeNode>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
let mut rx = compute.lsn_lease_state.desired_lease_lsn.subscribe();
|
||||
loop {
|
||||
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
|
||||
let Some(valid_until) =
|
||||
acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, &rx)?
|
||||
else {
|
||||
info!(current=?*rx.borrow(), "sleeping until desired_lease_lsn changes to Some()");
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(rx.wait_for(|current| current.is_some()))
|
||||
.expect("the Sender is kept alive in ComputeNode, which we hold a reference to");
|
||||
info!(current=?*rx.borrow(), "waking up from sleep");
|
||||
continue;
|
||||
};
|
||||
let valid_duration = valid_until
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or(Duration::ZERO);
|
||||
@@ -83,13 +92,15 @@ fn lsn_lease_bg_task(
|
||||
}
|
||||
|
||||
/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
|
||||
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
|
||||
/// Returns an error if a lease is explicitly not granted.
|
||||
/// Returns None if the rx is None.
|
||||
/// Otherwise, we keep sending requests.
|
||||
fn acquire_lsn_lease_with_retry(
|
||||
compute: &Arc<ComputeNode>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<SystemTime> {
|
||||
rx: &watch::Receiver<Option<Lsn>>,
|
||||
) -> Result<Option<SystemTime>> {
|
||||
let mut attempts = 0usize;
|
||||
let mut retry_period_ms: f64 = 500.0;
|
||||
const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
|
||||
@@ -105,11 +116,16 @@ fn acquire_lsn_lease_with_retry(
|
||||
)
|
||||
};
|
||||
|
||||
// immediately copy the value so the borrow is short-lived, it's a best practice
|
||||
let Some(lsn) = *rx.borrow() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let result =
|
||||
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
|
||||
match result {
|
||||
Ok(Some(res)) => {
|
||||
return Ok(res);
|
||||
return Ok(Some(res));
|
||||
}
|
||||
Ok(None) => {
|
||||
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
|
||||
|
||||
Reference in New Issue
Block a user