mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 00:12:54 +00:00
fetch apply_lsn as part of monitor (can have a separate task at a later point, but this will do)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1348,6 +1348,7 @@ dependencies = [
|
||||
"p256 0.13.2",
|
||||
"pageserver_page_api",
|
||||
"postgres",
|
||||
"postgres-types",
|
||||
"postgres_initdb",
|
||||
"postgres_versioninfo",
|
||||
"regex",
|
||||
|
||||
@@ -67,6 +67,7 @@ uuid.workspace = true
|
||||
walkdir.workspace = true
|
||||
x509-cert.workspace = true
|
||||
|
||||
postgres-types.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
compute_api.workspace = true
|
||||
|
||||
@@ -43,7 +43,7 @@ use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::lsn_lease::{self, launch_lsn_lease_bg_task_for_static};
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -131,6 +131,8 @@ pub struct ComputeNode {
|
||||
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
|
||||
pub(crate) lsn_lease_state: Arc<lsn_lease::State>,
|
||||
|
||||
/// Handle to the extension stats collection task
|
||||
extension_stats_task: TaskHandle,
|
||||
lfc_offload_task: TaskHandle,
|
||||
@@ -438,6 +440,7 @@ impl ComputeNode {
|
||||
compute_ctl_config: config.compute_ctl_config,
|
||||
extension_stats_task: Mutex::new(None),
|
||||
lfc_offload_task: Mutex::new(None),
|
||||
lsn_lease_state: Arc::new(lsn_lease::State::default()),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,24 @@ use utils::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct State {
|
||||
desired_lease_lsn: tokio::sync::watch::Sender<Option<Lsn>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn update_desired_lease_lsn(&self, update: Lsn) {
|
||||
self.desired_lease_lsn.send_if_modified(|value| {
|
||||
let modified = *value != Some(update);
|
||||
if let Some(value) = *value && value > update {
|
||||
warn!(current=%value, new=%update, "desired lease lsn moving backwards, this shouldn't happen");
|
||||
}
|
||||
*value = Some(update);
|
||||
modified
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a background thread to periodically renew LSN leases for static compute.
|
||||
/// Do nothing if the compute is not in static mode.
|
||||
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
|
||||
@@ -4,9 +4,10 @@ use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeFeature;
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode};
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{Level, error, info, instrument, span};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
|
||||
@@ -344,6 +345,41 @@ impl ComputeMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
let mode: ComputeMode = self
|
||||
.compute
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("we launch ComputeMonitor only after we received a spec")
|
||||
.spec
|
||||
.mode;
|
||||
match mode {
|
||||
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
|
||||
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
|
||||
ComputeMode::Primary => (),
|
||||
ComputeMode::Static(lsn) => {
|
||||
self.compute.lsn_lease_state.update_desired_lease_lsn(lsn);
|
||||
}
|
||||
ComputeMode::Replica => {
|
||||
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
|
||||
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
|
||||
Ok(apply_lsn) => {
|
||||
let apply_lsn = Lsn(apply_lsn.into());
|
||||
self.compute.lsn_lease_state.update_desired_lease_lsn(apply_lsn);
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("parse apply_lsn: {e}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
anyhow::bail!("query apply_lsn: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user