This commit is contained in:
Christian Schwarz
2025-07-20 18:51:36 +00:00
parent ca82b739d3
commit 0d2c100048
6 changed files with 374 additions and 54 deletions

View File

@@ -5,10 +5,13 @@ use std::{
time::{Duration, Instant, SystemTime},
};
use anyhow::Context;
use chrono::Utc;
use compute_api::spec::PageserverProtocol;
use futures::{StreamExt, stream::FuturesUnordered};
use postgres::SimpleQueryMessage;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, warn};
use tracing::{Instrument, error, info, info_span, instrument, warn};
use utils::{backoff::retry, id::TimelineId, lsn::Lsn};
use crate::{
@@ -47,57 +50,87 @@ pub fn spawn_bg_task(compute: Arc<ComputeNode>) {
#[derive(Clone)]
struct Reservation {
horizon: Lsn,
expiration: Instant,
expiration: SystemTime,
}
#[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))]
async fn bg_task(compute: Arc<ComputeNode>) {
// Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation.
let lease_id = format!("v1-{}-{}", compute.params.compute_id, std::process::id());
tracing::Span::current().record("lease_id", tracing::field::display(&lease_id));
// Wait until we have the first value.
// Allows us to simply .unwrap() later because it never transitions back to None.
info!("waiting for first lease lsn to be fetched from postgres");
let mut min_inflight_request_lsn_changed =
compute.ro_replica.min_inflight_request_lsn.subscribe();
min_inflight_request_lsn_changed.mark_changed();
min_inflight_request_lsn_changed.mark_changed(); // it could have been set already
min_inflight_request_lsn_changed
.wait_for(|value| value.is_some())
.await;
let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(());
// React to connstring changes. Sadly there is no async API for this yet.
let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None);
std::thread::spawn({
let compute = Arc::clone(&compute);
move || {
loop {
compute
.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_secs(todo!()));
connstr_watch_tx.send_replace(());
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::MAX);
let new = compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.and_then(|pspec| pspec.spec.pageserver_connstring.clone());
connstr_watch_tx.send_if_modified(|existing| {
if &new != existing {
*existing = new;
true
} else {
false
}
});
}
}
});
let mut reservation = Reservation {
horizon: Lsn(0),
expiration: Instant::now(),
let mut obtained = ObtainedLease {
lsn: Lsn(0),
nearest_expiration: SystemTime::UNIX_EPOCH,
};
loop {
let valid_duration = obtained
.nearest_expiration
.duration_since(SystemTime::now())
.unwrap_or_default();
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
let sleep_duration = valid_duration
.saturating_sub(Duration::from_secs(60))
.max(valid_duration / 2);
tokio::select! {
_ = tokio::time::sleep_until(reservation.expiration.into()) => {
info!("updating due to expiration");
_ = tokio::time::sleep(sleep_duration) => {
info!("updating because lease is going to expire soon");
}
_ = connstr_watch_rx.changed() => {
info!("updating due to changed pageserver_connstr")
}
_ = async {
// debounce TODO make this lower in tests
// debounce; TODO make this lower in tests
tokio::time::sleep(Duration::from_secs(10));
// every 10 GiB TODO make this tighter in tests?
// every 10 GiB; TODO make this tighter in tests?
let max_horizon_lag = 10 * (1<<30);
min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > reservation.horizon.0 + max_horizon_lag).await
min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await
} => {
info!(%reservation.horizon, "updating due to max horizon lag");
info!(%obtained.lsn, "updating due to max horizon lag");
}
}
// retry forever
let compute = Arc::clone(&compute);
reservation = retry(
|| attempt(&compute),
let lease_id = lease_id.clone();
obtained = retry(
|| attempt(lease_id.clone(), &compute),
|_| false,
0,
u32::MAX, // forever
@@ -111,8 +144,12 @@ async fn bg_task(compute: Arc<ComputeNode>) {
}
}
// Returns expiration time
async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
struct ObtainedLease {
lsn: Lsn,
nearest_expiration: SystemTime,
}
async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result<ObtainedLease> {
let (shards, timeline_id) = {
let state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
@@ -124,6 +161,7 @@ async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
.min_inflight_request_lsn
.borrow()
.expect("we only call this function once it has been transitioned to Some");
let mut futs = FuturesUnordered::new();
for connect_info in shards {
let logging_span = info_span!(
@@ -132,42 +170,54 @@ async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
shard_id=%connect_info.tenant_shard_id.shard_slug(),
timeline_id=%timeline_id,
);
let logging_wrapper = |fut: Pin<Box<dyn Future<Output = anyhow::Result<Reservation>>>>| {
async move {
match fut.await {
Ok(v) => Ok(v),
Err(err) => {
error!(
"failed to advance standby_horizon, communicator reads from this shard may star failing: {err:?}"
);
Err(())
let logging_wrapper =
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<SystemTime>>>>>| {
async move {
// TODO: timeout?
match fut.await {
Ok(Some(v)) => {
info!("lease obtained");
Ok(Some(v))
}
Ok(None) => {
error!("pageserver rejected our request");
Ok(None)
}
Err(err) => {
error!("communication failure: {err:?}");
Err(())
}
}
}
}}.instrument(logging_span)
};
.instrument(logging_span)
};
let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? {
PageserverProtocol::Libpq => {
logging_wrapper(Box::pin(attempt_one_libpq(connect_info, timeline_id, lsn)))
}
PageserverProtocol::Grpc => {
logging_wrapper(Box::pin(attempt_one_grpc(connect_info, timeline_id, lsn)))
}
PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq(
connect_info,
timeline_id,
lease_id.clone(),
lsn,
))),
PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc(
connect_info,
timeline_id,
lease_id.clone(),
lsn,
))),
};
futs.push(fut);
}
let mut errors = 0;
let mut min = None;
let mut nearest_expiration = None;
while let Some(res) = futs.next().await {
match res {
Ok(reservation) => {
let Reservation {
horizon,
expiration,
} = min.get_or_insert_with(|| reservation.clone());
*horizon = std::cmp::min(*horizon, reservation.horizon);
*expiration = std::cmp::min(*expiration, reservation.expiration);
Ok(Some(expiration)) => {
let nearest_expiration = nearest_expiration.get_or_insert(expiration);
*nearest_expiration = std::cmp::min(*nearest_expiration, expiration);
}
Err(()) => {
Ok(None) | Err(()) => {
// the logging wrapper does the logging
errors += 1;
}
}
}
@@ -176,8 +226,11 @@ async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
"failed to advance standby_horizon for {errors} shards, check logs for details"
));
}
match min {
Some(min) => Ok(min),
match nearest_expiration {
Some(v) => Ok(ObtainedLease {
lsn,
nearest_expiration: nearest_expiration.expect("we either errors+=1 or set it"),
}),
None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen
}
}
@@ -185,21 +238,67 @@ async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
async fn attempt_one_libpq(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
) -> anyhow::Result<Reservation> {
) -> anyhow::Result<Option<SystemTime>> {
let ConnectInfo {
tenant_shard_id,
connstring,
auth,
} = connect_info;
tokio_postgres::Config::from_str(&connstring)?;
todo!()
let mut config = tokio_postgres::Config::from_str(&connstring)?;
if let Some(auth) = auth {
config.password(auth);
}
let (mut client, conn) = config.connect(postgres::NoTls).await?;
tokio::spawn(conn);
let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} ");
let res = client.simple_query(&cmd).await?;
let msg = match res.first() {
Some(msg) => msg,
None => anyhow::bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => anyhow::bail!("expected row message type"),
};
// Note: this will be None if a lease is explicitly not granted.
let Some(expiration) = row.get("expiration") else {
return Ok(None);
};
let expiration =
SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?));
Ok(expiration)
}
async fn attempt_one_grpc(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
) -> anyhow::Result<Reservation> {
todo!()
) -> anyhow::Result<Option<SystemTime>> {
let ConnectInfo {
tenant_shard_id,
connstring,
auth,
} = connect_info;
let mut client = pageserver_page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn };
match client.lease_standby_horizon(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
}

View File

@@ -70,6 +70,18 @@ service PageService {
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse);
// Upserts a standby_horizon lease. RO replicas rely on this type of lease.
// In slightly more detail: RO replicas always lag to some degree behind the
// primary, and request pages at their respective apply LSN. The standby horizon mechanism
// ensures that the Pageserver does not garbage-collect old page versions in
// the interval between `min(valid standby horizon leases)` and the most recent page version.
//
// Each RO replica call this method continuously as it applies more WAL.
// It identifies its lease through an opaque "lease_id" across these requests.
// The response contains the lease expiration time.
// Status `FailedPrecondition` is returned if the lease cannot be granted.
rpc LeaseStandbyHorizon(LeaseStandbyHorizonRequest) returns (LeaseStandbyHorizonResponse);
}
// The LSN a request should read at.
@@ -272,3 +284,16 @@ message LeaseLsnResponse {
// The lease expiration time.
google.protobuf.Timestamp expires = 1;
}
// Request for LeaseStandbyHorizon rpc.
// The lease_id identifies the lease in subsequent requests.
// The lsn must be monotonic; the request will fail if it is not.
message LeaseStandbyHorizonRequest {
string lease_id = 1;
uint64 lsn = 2;
}
// Response for the success case of LeaseStandbyHorizon rpc.
message LeaseStandbyHorizonResponse {
google.protobuf.Timestamp expiration = 1;
}

View File

@@ -143,6 +143,12 @@ impl Client {
let resp = self.inner.lease_lsn(req).await?.into_inner();
Ok(resp.try_into()?)
}
pub async fn lease_standby_horizon(&mut self, req: LeaseStandbyHorizonRequest) -> tonic::Result<LeaseStandbyHorizonResponse> {
let req = proto::LeaseStandbyHorizonRequest::from(req);
let resp = self.inner.lease_standby_horizon(req).await?.into_inner();
Ok(resp.try_into()?)
}
}
/// Adds authentication metadata to gRPC requests.

View File

@@ -755,3 +755,64 @@ impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
}
}
}
pub struct LeaseStandbyHorizonRequest {
pub lease_id: String,
pub lsn: Lsn,
}
impl TryFrom<proto::LeaseStandbyHorizonRequest> for LeaseStandbyHorizonRequest {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseStandbyHorizonRequest) -> Result<Self, Self::Error> {
if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn"));
}
if pb.lease_id.len() == 0 {
return Err(ProtocolError::Invalid("lease_id", pb.lease_id));
}
Ok(Self {
lease_id: pb.lease_id,
lsn: Lsn(pb.lsn),
})
}
}
impl From<LeaseStandbyHorizonRequest> for proto::LeaseStandbyHorizonRequest {
fn from(request: LeaseStandbyHorizonRequest) -> Self {
Self {
lease_id: request.lease_id,
lsn: request.lsn.0,
}
}
}
/// Lease expiration time. If the lease could not be granted because the LSN has already been
/// garbage collected, a FailedPrecondition status will be returned instead.
pub type LeaseStandbyHorizonResponse = SystemTime;
impl TryFrom<proto::LeaseStandbyHorizonResponse> for LeaseStandbyHorizonResponse {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result<Self, Self::Error> {
let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?;
UNIX_EPOCH
.checked_add(Duration::new(
expiration.seconds as u64,
expiration.nanos as u32,
))
.ok_or_else(|| ProtocolError::invalid("expiration", expiration))
}
}
impl From<LeaseStandbyHorizonResponse> for proto::LeaseStandbyHorizonResponse {
fn from(response: LeaseStandbyHorizonResponse) -> Self {
let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default();
Self {
expiration: Some(prost_types::Timestamp {
seconds: expiration.as_secs() as i64,
nanos: expiration.subsec_nanos() as i32,
}),
}
}
}

View File

@@ -2318,6 +2318,7 @@ pub(crate) enum ComputeCommandKind {
Basebackup,
Fullbackup,
LeaseLsn,
LeaseStandbyHorizon,
}
pub(crate) struct ComputeCommandCounters {

View File

@@ -76,6 +76,7 @@ use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
debug_assert_current_span_has_tenant_id,
};
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
use crate::tenant::mgr::{
@@ -2218,7 +2219,7 @@ impl PageServerHandler {
valid_until_str.as_deref().unwrap_or("<unknown>")
);
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
let bytes: Option<&[u8]> = valid_until_str.as_ref().map(|x| x.as_bytes());
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"valid_until",
@@ -2228,6 +2229,51 @@ impl PageServerHandler {
Ok(())
}
#[instrument(skip_all, fields(shard_id, %lsn))]
async fn handle_lease_standby_horizon<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_id();
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(
tenant_shard_id.tenant_id,
timeline_id,
ShardSelector::Known(tenant_shard_id.to_index()),
)
.await?;
set_tracing_field_shard_id(&timeline);
let result: Option<SystemTime> = todo!();
// Encode result as Option<millis since epoch>
let bytes = result.map(|t| {
t.duration_since(SystemTime::UNIX_EPOCH)
.expect("we wouldn't allow a lease at epoch, system time would be horribly off")
.as_millis()
.to_string()
.into_bytes()
});
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"expiration",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?;
Ok(())
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
timeline: &Timeline,
@@ -2718,6 +2764,14 @@ struct LeaseLsnCmd {
lsn: Lsn,
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct LeaseStandbyHorizonCmd {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lease_id: String,
lsn: Lsn,
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum PageServiceCmd {
Set,
@@ -2725,6 +2779,7 @@ enum PageServiceCmd {
BaseBackup(BaseBackupCmd),
FullBackup(FullBackupCmd),
LeaseLsn(LeaseLsnCmd),
LeaseStandbyHorizon(LeaseStandbyHorizonCmd),
}
impl PageStreamCmd {
@@ -2874,6 +2929,31 @@ impl LeaseLsnCmd {
}
}
impl LeaseStandbyHorizonCmd {
fn parse(query: &str) -> anyhow::Result<Self> {
let parameters = query.split_whitespace().collect_vec();
if parameters.len() != 4 {
bail!(
"invalid number of parameters for lease lsn command: {}",
query
);
}
let tenant_shard_id = TenantShardId::from_str(parameters[0])
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
let timeline_id = TimelineId::from_str(parameters[1])
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
let lease_id = parameters[2].to_string();
let standby_horizon = Lsn::from_str(parameters[3])
.with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
Ok(Self {
tenant_shard_id,
timeline_id,
lease_id,
lsn: standby_horizon,
})
}
}
impl PageServiceCmd {
fn parse(query: &str) -> anyhow::Result<Self> {
let query = query.trim();
@@ -2898,6 +2978,10 @@ impl PageServiceCmd {
let cmd2 = cmd2.to_ascii_lowercase();
if cmd2 == "lsn" {
Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
} else if cmd2 == "standby_horizon" {
Ok(Self::LeaseStandbyHorizon(LeaseStandbyHorizonCmd::parse(
other,
)?))
} else {
bail!("invalid lease command: {cmd}");
}
@@ -3161,6 +3245,45 @@ where
}
};
}
PageServiceCmd::LeaseStandbyHorizon(LeaseStandbyHorizonCmd {
tenant_shard_id,
timeline_id,
lease_id,
lsn,
}) => {
tracing::Span::current()
.record("tenant_id", field::display(tenant_shard_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_shard_id.tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::LeaseStandbyHorizon)
.inc();
match self
.handle_lease_standby_horizon(
pgb,
tenant_shard_id,
timeline_id,
lease_id,
lsn,
&ctx,
)
.await
{
Ok(()) => {
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
}
Err(e) => {
error!("error obtaining standby_horizon lease for {lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
}
};
}
}
Ok(())
@@ -3801,6 +3924,11 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(expires.into()))
}
#[instrument(skip_all, fields(lease_id, lsn))]
async fn lease_standby_horizon(&self, req: tonic::Request<proto::LeaseStandbyHorizonRequest>) -> Result<tonic::Response<proto::LeaseStandbyHorizonResponse>, tonic::Status> {
todo!()
}
}
/// gRPC middleware layer that handles observability concerns: