mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
WIP: standby_horizon leases
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1348,6 +1348,7 @@ dependencies = [
|
||||
"p256 0.13.2",
|
||||
"pageserver_page_api",
|
||||
"postgres",
|
||||
"postgres-types",
|
||||
"postgres_initdb",
|
||||
"postgres_versioninfo",
|
||||
"regex",
|
||||
|
||||
@@ -67,6 +67,7 @@ uuid.workspace = true
|
||||
walkdir.workspace = true
|
||||
x509-cert.workspace = true
|
||||
|
||||
postgres-types.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
compute_api.workspace = true
|
||||
|
||||
@@ -46,7 +46,6 @@ use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::pgbouncer::*;
|
||||
use crate::rsyslog::{
|
||||
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
|
||||
@@ -57,6 +56,7 @@ use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
use crate::tls::watch_cert_for_changes;
|
||||
use crate::{config, extension_server, local_proxy};
|
||||
use crate::{pg_helpers::*, ro_replica};
|
||||
|
||||
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
|
||||
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
|
||||
@@ -131,6 +131,8 @@ pub struct ComputeNode {
|
||||
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
|
||||
pub(crate) ro_replica: Arc<ro_replica::GlobalState>,
|
||||
|
||||
/// Handle to the extension stats collection task
|
||||
extension_stats_task: TaskHandle,
|
||||
lfc_offload_task: TaskHandle,
|
||||
@@ -438,6 +440,7 @@ impl ComputeNode {
|
||||
compute_ctl_config: config.compute_ctl_config,
|
||||
extension_stats_task: Mutex::new(None),
|
||||
lfc_offload_task: Mutex::new(None),
|
||||
ro_replica: Arc::new(ro_replica::GlobalState::default()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -490,6 +493,8 @@ impl ComputeNode {
|
||||
|
||||
launch_lsn_lease_bg_task_for_static(&this);
|
||||
|
||||
ro_replica::spawn_bg_task(Arc::clone(&this));
|
||||
|
||||
// We have a spec, start the compute
|
||||
let mut delay_exit = false;
|
||||
let mut vm_monitor = None;
|
||||
|
||||
@@ -23,9 +23,11 @@ pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod pgbouncer;
|
||||
pub(crate) mod ro_replica;
|
||||
pub mod rsyslog;
|
||||
pub mod spec;
|
||||
mod spec_apply;
|
||||
pub mod swap;
|
||||
pub mod sync_sk;
|
||||
pub mod tls;
|
||||
pub mod pageserver_client;
|
||||
|
||||
@@ -5,15 +5,15 @@ use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use compute_api::spec::{ComputeMode, PageserverProtocol};
|
||||
use itertools::Itertools as _;
|
||||
use pageserver_page_api as page_api;
|
||||
use postgres::{NoTls, SimpleQueryMessage};
|
||||
use tracing::{info, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect};
|
||||
|
||||
/// Spawns a background thread to periodically renew LSN leases for static compute.
|
||||
/// Do nothing if the compute is not in static mode.
|
||||
@@ -31,7 +31,7 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
|
||||
thread::spawn(move || {
|
||||
let _entered = span.entered();
|
||||
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
|
||||
if let Err(e) = lsn_lease_bg_task(compute, timeline_id, lsn) {
|
||||
// TODO: might need stronger error feedback than logging an warning.
|
||||
warn!("Exited with error: {e}");
|
||||
}
|
||||
@@ -39,14 +39,9 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
}
|
||||
|
||||
/// Renews lsn lease periodically so static compute are not affected by GC.
|
||||
fn lsn_lease_bg_task(
|
||||
compute: Arc<ComputeNode>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
fn lsn_lease_bg_task(compute: Arc<ComputeNode>, timeline_id: TimelineId, lsn: Lsn) -> Result<()> {
|
||||
loop {
|
||||
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
|
||||
let valid_until = acquire_lsn_lease_with_retry(&compute, timeline_id, lsn)?;
|
||||
let valid_duration = valid_until
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or(Duration::ZERO);
|
||||
@@ -68,7 +63,6 @@ fn lsn_lease_bg_task(
|
||||
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
|
||||
fn acquire_lsn_lease_with_retry(
|
||||
compute: &Arc<ComputeNode>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<SystemTime> {
|
||||
@@ -78,17 +72,13 @@ fn acquire_lsn_lease_with_retry(
|
||||
|
||||
loop {
|
||||
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
|
||||
let (connstrings, auth) = {
|
||||
let shards = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let spec = state.pspec.as_ref().expect("spec must be set");
|
||||
(
|
||||
spec.pageserver_connstr.clone(),
|
||||
spec.storage_auth_token.clone(),
|
||||
)
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
pageserver_connstrings_for_connect(pspec)
|
||||
};
|
||||
|
||||
let result =
|
||||
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
|
||||
let result = try_acquire_lsn_lease(shards, timeline_id, lsn);
|
||||
match result {
|
||||
Ok(Some(res)) => {
|
||||
return Ok(res);
|
||||
@@ -112,33 +102,32 @@ fn acquire_lsn_lease_with_retry(
|
||||
|
||||
/// Tries to acquire LSN leases on all Pageserver shards.
|
||||
fn try_acquire_lsn_lease(
|
||||
connstrings: &str,
|
||||
auth: Option<&str>,
|
||||
tenant_id: TenantId,
|
||||
shards: Vec<ConnectInfo>,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<Option<SystemTime>> {
|
||||
let connstrings = connstrings.split(',').collect_vec();
|
||||
let shard_count = connstrings.len();
|
||||
let mut leases = Vec::new();
|
||||
|
||||
for (shard_number, &connstring) in connstrings.iter().enumerate() {
|
||||
let tenant_shard_id = match shard_count {
|
||||
0 | 1 => TenantShardId::unsharded(tenant_id),
|
||||
shard_count => TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(shard_number as u8),
|
||||
shard_count: ShardCount::new(shard_count as u8),
|
||||
},
|
||||
};
|
||||
|
||||
let lease = match PageserverProtocol::from_connstring(connstring)? {
|
||||
PageserverProtocol::Libpq => {
|
||||
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
|
||||
}
|
||||
PageserverProtocol::Grpc => {
|
||||
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
|
||||
}
|
||||
for ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring,
|
||||
auth,
|
||||
} in shards
|
||||
{
|
||||
let lease = match PageserverProtocol::from_connstring(&connstring)? {
|
||||
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
|
||||
&connstring,
|
||||
auth.as_deref(),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
)?,
|
||||
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
|
||||
&connstring,
|
||||
auth.as_deref(),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
)?,
|
||||
};
|
||||
leases.push(lease);
|
||||
}
|
||||
|
||||
@@ -4,9 +4,10 @@ use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeFeature;
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode};
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{Level, error, info, instrument, span};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
|
||||
@@ -344,6 +345,42 @@ impl ComputeMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
let mode: ComputeMode = self
|
||||
.compute
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("we launch ComputeMonitor only after we received a spec")
|
||||
.spec
|
||||
.mode;
|
||||
match mode {
|
||||
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
|
||||
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
|
||||
ComputeMode::Primary => (),
|
||||
ComputeMode::Static(_) => (),
|
||||
ComputeMode::Replica => {
|
||||
// TODO: instead of apply_lsn, use min inflight request LSN
|
||||
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
|
||||
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
|
||||
Ok(apply_lsn) => {
|
||||
let apply_lsn = Lsn(apply_lsn.into());
|
||||
self.compute
|
||||
.ro_replica
|
||||
.update_min_inflight_request_lsn(apply_lsn);
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("parse apply_lsn: {e}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
anyhow::bail!("query apply_lsn: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
38
compute_tools/src/pageserver_client.rs
Normal file
38
compute_tools/src/pageserver_client.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use itertools::Itertools;
|
||||
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
|
||||
use crate::compute::ParsedSpec;
|
||||
|
||||
pub struct ConnectInfo {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub connstring: String,
|
||||
pub auth: Option<String>,
|
||||
}
|
||||
|
||||
pub fn pageserver_connstrings_for_connect(pspec: &ParsedSpec) -> Vec<ConnectInfo> {
|
||||
let connstrings = &pspec.pageserver_connstr;
|
||||
let auth = pspec.storage_auth_token.clone();
|
||||
|
||||
let connstrings = connstrings.split(',').collect_vec();
|
||||
let shard_count = connstrings.len();
|
||||
|
||||
let mut infos = Vec::with_capacity(connstrings.len());
|
||||
for (shard_number, connstring) in connstrings.iter().enumerate() {
|
||||
let tenant_shard_id = match shard_count {
|
||||
0 | 1 => TenantShardId::unsharded(pspec.tenant_id),
|
||||
shard_count => TenantShardId {
|
||||
tenant_id: pspec.tenant_id,
|
||||
shard_number: ShardNumber(shard_number as u8),
|
||||
shard_count: ShardCount::new(shard_count as u8),
|
||||
},
|
||||
};
|
||||
|
||||
infos.push(ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring: connstring.to_string(),
|
||||
auth: auth.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
infos
|
||||
}
|
||||
297
compute_tools/src/ro_replica.rs
Normal file
297
compute_tools/src/ro_replica.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use compute_api::spec::PageserverProtocol;
|
||||
use futures::{StreamExt, stream::FuturesUnordered};
|
||||
use postgres::SimpleQueryMessage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, error, info, info_span, instrument, warn};
|
||||
use utils::{backoff::retry, id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
compute::ComputeNode,
|
||||
pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GlobalState {
|
||||
min_inflight_request_lsn: tokio::sync::watch::Sender<Option<Lsn>>,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub fn update_min_inflight_request_lsn(&self, update: Lsn) {
|
||||
self.min_inflight_request_lsn.send_if_modified(|value| {
|
||||
let modified = *value != Some(update);
|
||||
if let Some(value) = *value && value > update {
|
||||
warn!(current=%value, new=%update, "min inflight request lsn moving backwards, this should not happen, bug in communicator");
|
||||
}
|
||||
*value = Some(update);
|
||||
modified
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_bg_task(compute: Arc<ComputeNode>) {
|
||||
std::thread::spawn(|| {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(bg_task(compute))
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))]
|
||||
async fn bg_task(compute: Arc<ComputeNode>) {
|
||||
// Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation.
|
||||
let lease_id = format!("v1-{}-{}", compute.params.compute_id, std::process::id());
|
||||
tracing::Span::current().record("lease_id", tracing::field::display(&lease_id));
|
||||
|
||||
// Wait until we have the first value.
|
||||
// Allows us to simply .unwrap() later because it never transitions back to None.
|
||||
info!("waiting for first lease lsn to be fetched from postgres");
|
||||
let mut min_inflight_request_lsn_changed =
|
||||
compute.ro_replica.min_inflight_request_lsn.subscribe();
|
||||
min_inflight_request_lsn_changed.mark_changed(); // it could have been set already
|
||||
min_inflight_request_lsn_changed
|
||||
.wait_for(|value| value.is_some())
|
||||
.await
|
||||
.expect("we never drop the sender");
|
||||
|
||||
// React to connstring changes. Sadly there is no async API for this yet.
|
||||
let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None);
|
||||
std::thread::spawn({
|
||||
let compute = Arc::clone(&compute);
|
||||
move || {
|
||||
loop {
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::MAX);
|
||||
let new = compute
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.pspec
|
||||
.as_ref()
|
||||
.and_then(|pspec| pspec.spec.pageserver_connstring.clone());
|
||||
connstr_watch_tx.send_if_modified(|existing| {
|
||||
if &new != existing {
|
||||
*existing = new;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut obtained = ObtainedLease {
|
||||
lsn: Lsn(0),
|
||||
nearest_expiration: SystemTime::UNIX_EPOCH,
|
||||
};
|
||||
loop {
|
||||
let valid_duration = obtained
|
||||
.nearest_expiration
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_default();
|
||||
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
|
||||
let sleep_duration = valid_duration
|
||||
.saturating_sub(Duration::from_secs(60))
|
||||
.max(valid_duration / 2);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(sleep_duration) => {
|
||||
info!("updating because lease is going to expire soon");
|
||||
}
|
||||
_ = connstr_watch_rx.changed() => {
|
||||
info!("updating due to changed pageserver_connstr")
|
||||
}
|
||||
_ = async {
|
||||
// debounce; TODO make this lower in tests
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
// every 10 GiB; TODO make this tighter in tests?
|
||||
let max_horizon_lag = 10 * (1<<30);
|
||||
min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await
|
||||
} => {
|
||||
info!(%obtained.lsn, "updating due to max horizon lag");
|
||||
}
|
||||
}
|
||||
// retry forever
|
||||
let compute = Arc::clone(&compute);
|
||||
let lease_id = lease_id.clone();
|
||||
obtained = retry(
|
||||
|| attempt(lease_id.clone(), &compute),
|
||||
|_| false,
|
||||
0,
|
||||
u32::MAX, // forever
|
||||
"update standby_horizon position in pageserver",
|
||||
// There is no cancellation story in compute_ctl
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.expect("is_permanent returns false, so, retry always returns Some")
|
||||
.expect("u32::MAX exceeded");
|
||||
}
|
||||
}
|
||||
|
||||
struct ObtainedLease {
|
||||
lsn: Lsn,
|
||||
nearest_expiration: SystemTime,
|
||||
}
|
||||
|
||||
async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result<ObtainedLease> {
|
||||
let (shards, timeline_id) = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
(pageserver_connstrings_for_connect(pspec), pspec.timeline_id)
|
||||
};
|
||||
|
||||
let lsn = compute
|
||||
.ro_replica
|
||||
.min_inflight_request_lsn
|
||||
.borrow()
|
||||
.expect("we only call this function once it has been transitioned to Some");
|
||||
|
||||
let mut futs = FuturesUnordered::new();
|
||||
for connect_info in shards {
|
||||
let logging_span = info_span!(
|
||||
"attempt_one",
|
||||
tenant_id=%connect_info.tenant_shard_id.tenant_id,
|
||||
shard_id=%connect_info.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%timeline_id,
|
||||
);
|
||||
let logging_wrapper =
|
||||
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<SystemTime>>>>>| {
|
||||
async move {
|
||||
// TODO: timeout?
|
||||
match fut.await {
|
||||
Ok(Some(v)) => {
|
||||
info!("lease obtained");
|
||||
Ok(Some(v))
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("pageserver rejected our request");
|
||||
Ok(None)
|
||||
}
|
||||
Err(err) => {
|
||||
error!("communication failure: {err:?}");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(logging_span)
|
||||
};
|
||||
let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? {
|
||||
PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq(
|
||||
connect_info,
|
||||
timeline_id,
|
||||
lease_id.clone(),
|
||||
lsn,
|
||||
))),
|
||||
PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc(
|
||||
connect_info,
|
||||
timeline_id,
|
||||
lease_id.clone(),
|
||||
lsn,
|
||||
))),
|
||||
};
|
||||
futs.push(fut);
|
||||
}
|
||||
let mut errors = 0;
|
||||
let mut nearest_expiration = None;
|
||||
while let Some(res) = futs.next().await {
|
||||
match res {
|
||||
Ok(Some(expiration)) => {
|
||||
let nearest_expiration = nearest_expiration.get_or_insert(expiration);
|
||||
*nearest_expiration = std::cmp::min(*nearest_expiration, expiration);
|
||||
}
|
||||
Ok(None) | Err(()) => {
|
||||
// the logging wrapper does the logging
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if errors > 0 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to advance standby_horizon for {errors} shards, check logs for details"
|
||||
));
|
||||
}
|
||||
match nearest_expiration {
|
||||
Some(nearest_expiration) => Ok(ObtainedLease {
|
||||
lsn,
|
||||
nearest_expiration,
|
||||
}),
|
||||
None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen
|
||||
}
|
||||
}
|
||||
|
||||
async fn attempt_one_libpq(
|
||||
connect_info: ConnectInfo,
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<SystemTime>> {
|
||||
let ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring,
|
||||
auth,
|
||||
} = connect_info;
|
||||
let mut config = tokio_postgres::Config::from_str(&connstring)?;
|
||||
if let Some(auth) = auth {
|
||||
config.password(auth);
|
||||
}
|
||||
let (client, conn) = config.connect(postgres::NoTls).await?;
|
||||
tokio::spawn(conn);
|
||||
let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} ");
|
||||
let res = client.simple_query(&cmd).await?;
|
||||
let msg = match res.first() {
|
||||
Some(msg) => msg,
|
||||
None => anyhow::bail!("empty response"),
|
||||
};
|
||||
let row = match msg {
|
||||
SimpleQueryMessage::Row(row) => row,
|
||||
_ => anyhow::bail!("expected row message type"),
|
||||
};
|
||||
|
||||
// Note: this will be None if a lease is explicitly not granted.
|
||||
let Some(expiration) = row.get("expiration") else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let expiration =
|
||||
SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?));
|
||||
Ok(expiration)
|
||||
}
|
||||
|
||||
async fn attempt_one_grpc(
|
||||
connect_info: ConnectInfo,
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<SystemTime>> {
|
||||
let ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring,
|
||||
auth,
|
||||
} = connect_info;
|
||||
let mut client = pageserver_page_api::Client::connect(
|
||||
connstring.to_string(),
|
||||
tenant_shard_id.tenant_id,
|
||||
timeline_id,
|
||||
tenant_shard_id.to_index(),
|
||||
auth.map(String::from),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn };
|
||||
match client.lease_standby_horizon(req).await {
|
||||
Ok(expires) => Ok(Some(expires)),
|
||||
// Lease couldn't be acquired
|
||||
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
@@ -70,6 +70,18 @@ service PageService {
|
||||
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
|
||||
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
|
||||
rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse);
|
||||
|
||||
// Upserts a standby_horizon lease. RO replicas rely on this type of lease.
|
||||
// In slightly more detail: RO replicas always lag to some degree behind the
|
||||
// primary, and request pages at their respective apply LSN. The standby horizon mechanism
|
||||
// ensures that the Pageserver does not garbage-collect old page versions in
|
||||
// the interval between `min(valid standby horizon leases)` and the most recent page version.
|
||||
//
|
||||
// Each RO replica call this method continuously as it applies more WAL.
|
||||
// It identifies its lease through an opaque "lease_id" across these requests.
|
||||
// The response contains the lease expiration time.
|
||||
// Status `FailedPrecondition` is returned if the lease cannot be granted.
|
||||
rpc LeaseStandbyHorizon(LeaseStandbyHorizonRequest) returns (LeaseStandbyHorizonResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
@@ -272,3 +284,16 @@ message LeaseLsnResponse {
|
||||
// The lease expiration time.
|
||||
google.protobuf.Timestamp expires = 1;
|
||||
}
|
||||
|
||||
// Request for LeaseStandbyHorizon rpc.
|
||||
// The lease_id identifies the lease in subsequent requests.
|
||||
// The lsn must be monotonic; the request will fail if it is not.
|
||||
message LeaseStandbyHorizonRequest {
|
||||
string lease_id = 1;
|
||||
uint64 lsn = 2;
|
||||
}
|
||||
|
||||
// Response for the success case of LeaseStandbyHorizon rpc.
|
||||
message LeaseStandbyHorizonResponse {
|
||||
google.protobuf.Timestamp expiration = 1;
|
||||
}
|
||||
|
||||
@@ -143,6 +143,12 @@ impl Client {
|
||||
let resp = self.inner.lease_lsn(req).await?.into_inner();
|
||||
Ok(resp.try_into()?)
|
||||
}
|
||||
|
||||
pub async fn lease_standby_horizon(&mut self, req: LeaseStandbyHorizonRequest) -> tonic::Result<LeaseStandbyHorizonResponse> {
|
||||
let req = proto::LeaseStandbyHorizonRequest::from(req);
|
||||
let resp = self.inner.lease_standby_horizon(req).await?.into_inner();
|
||||
Ok(resp.try_into()?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds authentication metadata to gRPC requests.
|
||||
|
||||
@@ -755,3 +755,64 @@ impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LeaseStandbyHorizonRequest {
|
||||
pub lease_id: String,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::LeaseStandbyHorizonRequest> for LeaseStandbyHorizonRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::LeaseStandbyHorizonRequest) -> Result<Self, Self::Error> {
|
||||
if pb.lsn == 0 {
|
||||
return Err(ProtocolError::Missing("lsn"));
|
||||
}
|
||||
if pb.lease_id.len() == 0 {
|
||||
return Err(ProtocolError::Invalid("lease_id", pb.lease_id));
|
||||
}
|
||||
Ok(Self {
|
||||
lease_id: pb.lease_id,
|
||||
lsn: Lsn(pb.lsn),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LeaseStandbyHorizonRequest> for proto::LeaseStandbyHorizonRequest {
|
||||
fn from(request: LeaseStandbyHorizonRequest) -> Self {
|
||||
Self {
|
||||
lease_id: request.lease_id,
|
||||
lsn: request.lsn.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lease expiration time. If the lease could not be granted because the LSN has already been
|
||||
/// garbage collected, a FailedPrecondition status will be returned instead.
|
||||
pub type LeaseStandbyHorizonResponse = SystemTime;
|
||||
|
||||
impl TryFrom<proto::LeaseStandbyHorizonResponse> for LeaseStandbyHorizonResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result<Self, Self::Error> {
|
||||
let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?;
|
||||
UNIX_EPOCH
|
||||
.checked_add(Duration::new(
|
||||
expiration.seconds as u64,
|
||||
expiration.nanos as u32,
|
||||
))
|
||||
.ok_or_else(|| ProtocolError::invalid("expiration", expiration))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LeaseStandbyHorizonResponse> for proto::LeaseStandbyHorizonResponse {
|
||||
fn from(response: LeaseStandbyHorizonResponse) -> Self {
|
||||
let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default();
|
||||
Self {
|
||||
expiration: Some(prost_types::Timestamp {
|
||||
seconds: expiration.as_secs() as i64,
|
||||
nanos: expiration.subsec_nanos() as i32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2327,6 +2327,7 @@ pub(crate) enum ComputeCommandKind {
|
||||
Basebackup,
|
||||
Fullbackup,
|
||||
LeaseLsn,
|
||||
LeaseStandbyHorizon,
|
||||
}
|
||||
|
||||
pub(crate) struct ComputeCommandCounters {
|
||||
|
||||
@@ -76,6 +76,7 @@ use crate::pgdatadir_mapping::{LsnRange, Version};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
|
||||
use crate::tenant::mgr::{
|
||||
@@ -2218,7 +2219,7 @@ impl PageServerHandler {
|
||||
valid_until_str.as_deref().unwrap_or("<unknown>")
|
||||
);
|
||||
|
||||
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
|
||||
let bytes: Option<&[u8]> = valid_until_str.as_ref().map(|x| x.as_bytes());
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"valid_until",
|
||||
@@ -2228,6 +2229,56 @@ impl PageServerHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id, %lsn))]
|
||||
async fn handle_lease_standby_horizon<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let timeline = self
|
||||
.timeline_handles
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.get(
|
||||
tenant_shard_id.tenant_id,
|
||||
timeline_id,
|
||||
ShardSelector::Known(tenant_shard_id.to_index()),
|
||||
)
|
||||
.await?;
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
|
||||
let result: Option<SystemTime> = timeline
|
||||
.lease_standby_horizon(lease_id, lsn, ctx)
|
||||
.inspect_err(|e| {
|
||||
warn!("{e}");
|
||||
})
|
||||
.ok();
|
||||
|
||||
// Encode result as Option<millis since epoch>
|
||||
let bytes = result.map(|t| {
|
||||
t.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("we wouldn't allow a lease at epoch, system time would be horribly off")
|
||||
.as_millis()
|
||||
.to_string()
|
||||
.into_bytes()
|
||||
});
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"expiration",
|
||||
)]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
timeline: &Timeline,
|
||||
@@ -2718,6 +2769,14 @@ struct LeaseLsnCmd {
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct LeaseStandbyHorizonCmd {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
enum PageServiceCmd {
|
||||
Set,
|
||||
@@ -2725,6 +2784,7 @@ enum PageServiceCmd {
|
||||
BaseBackup(BaseBackupCmd),
|
||||
FullBackup(FullBackupCmd),
|
||||
LeaseLsn(LeaseLsnCmd),
|
||||
LeaseStandbyHorizon(LeaseStandbyHorizonCmd),
|
||||
}
|
||||
|
||||
impl PageStreamCmd {
|
||||
@@ -2874,6 +2934,31 @@ impl LeaseLsnCmd {
|
||||
}
|
||||
}
|
||||
|
||||
impl LeaseStandbyHorizonCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let parameters = query.split_whitespace().collect_vec();
|
||||
if parameters.len() != 4 {
|
||||
bail!(
|
||||
"invalid number of parameters for lease lsn command: {}",
|
||||
query
|
||||
);
|
||||
}
|
||||
let tenant_shard_id = TenantShardId::from_str(parameters[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
|
||||
let timeline_id = TimelineId::from_str(parameters[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
|
||||
let lease_id = parameters[2].to_string();
|
||||
let standby_horizon = Lsn::from_str(parameters[3])
|
||||
.with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
|
||||
Ok(Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lease_id,
|
||||
lsn: standby_horizon,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServiceCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let query = query.trim();
|
||||
@@ -2898,6 +2983,10 @@ impl PageServiceCmd {
|
||||
let cmd2 = cmd2.to_ascii_lowercase();
|
||||
if cmd2 == "lsn" {
|
||||
Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
|
||||
} else if cmd2 == "standby_horizon" {
|
||||
Ok(Self::LeaseStandbyHorizon(LeaseStandbyHorizonCmd::parse(
|
||||
other,
|
||||
)?))
|
||||
} else {
|
||||
bail!("invalid lease command: {cmd}");
|
||||
}
|
||||
@@ -3161,6 +3250,45 @@ where
|
||||
}
|
||||
};
|
||||
}
|
||||
PageServiceCmd::LeaseStandbyHorizon(LeaseStandbyHorizonCmd {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lease_id,
|
||||
lsn,
|
||||
}) => {
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_shard_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::LeaseStandbyHorizon)
|
||||
.inc();
|
||||
|
||||
match self
|
||||
.handle_lease_standby_horizon(
|
||||
pgb,
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lease_id,
|
||||
lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error obtaining standby_horizon lease for {lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -3801,6 +3929,28 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
Ok(tonic::Response::new(expires.into()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(lease_id, lsn))]
|
||||
async fn lease_standby_horizon(
|
||||
&self,
|
||||
req: tonic::Request<proto::LeaseStandbyHorizonRequest>,
|
||||
) -> Result<tonic::Response<proto::LeaseStandbyHorizonResponse>, tonic::Status> {
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// Validate and convert the request, and decorate the span.
|
||||
let page_api::LeaseStandbyHorizonRequest { lease_id, lsn } = req.into_inner().try_into()?;
|
||||
|
||||
span_record!(lease_id=%lease_id, lsn=%lsn);
|
||||
|
||||
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
|
||||
let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) {
|
||||
Ok(expiration) => expiration,
|
||||
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))),
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(expiration.into()))
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC middleware layer that handles observability concerns:
|
||||
|
||||
@@ -1860,6 +1860,15 @@ impl Timeline {
|
||||
Ok(lease)
|
||||
}
|
||||
|
||||
pub(crate) fn lease_standby_horizon(
|
||||
&self,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<SystemTime> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
/// Returns the flush request ID which can be awaited with wait_flush_completion().
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
|
||||
Reference in New Issue
Block a user