mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
Part of #7497, closes #8817. ## Problem See #8817. ## Summary of changes **compute_ctl** - Renew lsn lease as soon as `/configure` updates pageserver_connstr, use `state_changed` Condvar for synchronization. **pageserver** As mentioned in https://github.com/neondatabase/neon/issues/8817#issuecomment-2315768076, we still want some permanent error reported if a lease cannot be granted. By considering attachment mode and the added `lsn_lease_deadline` when processing lease requests, we can also bound the case of bad requests to a very short period after migration/restart. - Refactor https://github.com/neondatabase/neon/pull/9024 and move `lsn_lease_deadline` to `AttachedTenantConf` so timeline can easily access it. - Have separate HTTP `init_lsn_lease` and libpq `renew_lsn_lease` API. - Always do LSN verification for the initial HTTP lease request. - LSN verification for the renewal is **still done** when tenants are not in `AttachedSingle` and we have pass the `lsn_lease_deadline`, which give plenty of time for compute to renew the lease. **neon_local** - add and call `timeline_init_lsn_lease` mgmt_api at static endpoint start. The initial lsn lease http request is sent when we run `cargo neon endpoint start <static endpoint>`. ## Testing - Extend `test_readonly_node_gc` to do pageserver restarts and migration. ## Future Work - The control plane should make the initial lease request through HTTP when creating a static endpoint. This is currently only done in `neon_local`. Signed-off-by: Yuchen Liang <yuchen@neon.tech>
186 lines
6.2 KiB
Rust
186 lines
6.2 KiB
Rust
use anyhow::bail;
|
|
use anyhow::Result;
|
|
use postgres::{NoTls, SimpleQueryMessage};
|
|
use std::time::SystemTime;
|
|
use std::{str::FromStr, sync::Arc, thread, time::Duration};
|
|
use utils::id::TenantId;
|
|
use utils::id::TimelineId;
|
|
|
|
use compute_api::spec::ComputeMode;
|
|
use tracing::{info, warn};
|
|
use utils::{
|
|
lsn::Lsn,
|
|
shard::{ShardCount, ShardNumber, TenantShardId},
|
|
};
|
|
|
|
use crate::compute::ComputeNode;
|
|
|
|
/// Spawns a background thread to periodically renew LSN leases for static compute.
|
|
/// Do nothing if the compute is not in static mode.
|
|
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
|
let (tenant_id, timeline_id, lsn) = {
|
|
let state = compute.state.lock().unwrap();
|
|
let spec = state.pspec.as_ref().expect("Spec must be set");
|
|
match spec.spec.mode {
|
|
ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
|
|
_ => return,
|
|
}
|
|
};
|
|
let compute = compute.clone();
|
|
|
|
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
|
|
thread::spawn(move || {
|
|
let _entered = span.entered();
|
|
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
|
|
// TODO: might need stronger error feedback than logging an warning.
|
|
warn!("Exited with error: {e}");
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Renews lsn lease periodically so static compute are not affected by GC.
|
|
fn lsn_lease_bg_task(
|
|
compute: Arc<ComputeNode>,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
lsn: Lsn,
|
|
) -> Result<()> {
|
|
loop {
|
|
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
|
|
let valid_duration = valid_until
|
|
.duration_since(SystemTime::now())
|
|
.unwrap_or(Duration::ZERO);
|
|
|
|
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
|
|
let sleep_duration = valid_duration
|
|
.saturating_sub(Duration::from_secs(60))
|
|
.max(valid_duration / 2);
|
|
|
|
info!(
|
|
"Request succeeded, sleeping for {} seconds",
|
|
sleep_duration.as_secs()
|
|
);
|
|
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
|
|
}
|
|
}
|
|
|
|
/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
|
|
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
|
|
fn acquire_lsn_lease_with_retry(
|
|
compute: &Arc<ComputeNode>,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
lsn: Lsn,
|
|
) -> Result<SystemTime> {
|
|
let mut attempts = 0usize;
|
|
let mut retry_period_ms: f64 = 500.0;
|
|
const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
|
|
|
|
loop {
|
|
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
|
|
let configs = {
|
|
let state = compute.state.lock().unwrap();
|
|
|
|
let spec = state.pspec.as_ref().expect("spec must be set");
|
|
|
|
let conn_strings = spec.pageserver_connstr.split(',');
|
|
|
|
conn_strings
|
|
.map(|connstr| {
|
|
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
|
|
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
|
config.password(storage_auth_token.clone());
|
|
}
|
|
config
|
|
})
|
|
.collect::<Vec<_>>()
|
|
};
|
|
|
|
let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
|
|
match result {
|
|
Ok(Some(res)) => {
|
|
return Ok(res);
|
|
}
|
|
Ok(None) => {
|
|
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
|
|
}
|
|
Err(e) => {
|
|
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
|
|
|
|
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
|
|
retry_period_ms as u64,
|
|
));
|
|
retry_period_ms *= 1.5;
|
|
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
|
|
}
|
|
}
|
|
attempts += 1;
|
|
}
|
|
}
|
|
|
|
/// Tries to acquire an LSN lease through PS page_service API.
|
|
fn try_acquire_lsn_lease(
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
lsn: Lsn,
|
|
configs: &[postgres::Config],
|
|
) -> Result<Option<SystemTime>> {
|
|
fn get_valid_until(
|
|
config: &postgres::Config,
|
|
tenant_shard_id: TenantShardId,
|
|
timeline_id: TimelineId,
|
|
lsn: Lsn,
|
|
) -> Result<Option<SystemTime>> {
|
|
let mut client = config.connect(NoTls)?;
|
|
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
|
|
let res = client.simple_query(&cmd)?;
|
|
let msg = match res.first() {
|
|
Some(msg) => msg,
|
|
None => bail!("empty response"),
|
|
};
|
|
let row = match msg {
|
|
SimpleQueryMessage::Row(row) => row,
|
|
_ => bail!("error parsing lsn lease response"),
|
|
};
|
|
|
|
// Note: this will be None if a lease is explicitly not granted.
|
|
let valid_until_str = row.get("valid_until");
|
|
|
|
let valid_until = valid_until_str.map(|s| {
|
|
SystemTime::UNIX_EPOCH
|
|
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
|
|
.expect("Time larger than max SystemTime could handle")
|
|
});
|
|
Ok(valid_until)
|
|
}
|
|
|
|
let shard_count = configs.len();
|
|
|
|
let valid_until = if shard_count > 1 {
|
|
configs
|
|
.iter()
|
|
.enumerate()
|
|
.map(|(shard_number, config)| {
|
|
let tenant_shard_id = TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount::new(shard_count as u8),
|
|
shard_number: ShardNumber(shard_number as u8),
|
|
};
|
|
get_valid_until(config, tenant_shard_id, timeline_id, lsn)
|
|
})
|
|
.collect::<Result<Vec<Option<SystemTime>>>>()?
|
|
.into_iter()
|
|
.min()
|
|
.unwrap()
|
|
} else {
|
|
get_valid_until(
|
|
&configs[0],
|
|
TenantShardId::unsharded(tenant_id),
|
|
timeline_id,
|
|
lsn,
|
|
)?
|
|
};
|
|
|
|
Ok(valid_until)
|
|
}
|