pageserver: add gRPC LSN lease support (#12384)

## Problem

The gRPC API does not provide LSN leases.

## Summary of changes

* Add LSN lease support to the gRPC API.
* Use gRPC LSN leases for static computes with `grpc://` connstrings.
* Move `PageserverProtocol` into the `compute_api::spec` module and
reuse it.
This commit is contained in:
Erik Grinaker
2025-06-30 14:44:17 +02:00
committed by GitHub
parent a384d7d501
commit d0a4ae3e8f
15 changed files with 281 additions and 113 deletions

13
Cargo.lock generated
View File

@@ -1279,6 +1279,7 @@ dependencies = [
"remote_storage", "remote_storage",
"serde", "serde",
"serde_json", "serde_json",
"url",
"utils", "utils",
] ]
@@ -4480,6 +4481,7 @@ dependencies = [
"pageserver_api", "pageserver_api",
"postgres_ffi_types", "postgres_ffi_types",
"prost 0.13.5", "prost 0.13.5",
"prost-types 0.13.5",
"strum", "strum",
"strum_macros", "strum_macros",
"thiserror 1.0.69", "thiserror 1.0.69",
@@ -5157,7 +5159,7 @@ dependencies = [
"petgraph", "petgraph",
"prettyplease", "prettyplease",
"prost 0.13.5", "prost 0.13.5",
"prost-types 0.13.3", "prost-types 0.13.5",
"regex", "regex",
"syn 2.0.100", "syn 2.0.100",
"tempfile", "tempfile",
@@ -5200,9 +5202,9 @@ dependencies = [
[[package]] [[package]]
name = "prost-types" name = "prost-types"
version = "0.13.3" version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
] ]
@@ -6809,6 +6811,7 @@ dependencies = [
"chrono", "chrono",
"clap", "clap",
"clashmap", "clashmap",
"compute_api",
"control_plane", "control_plane",
"cron", "cron",
"diesel", "diesel",
@@ -7642,7 +7645,7 @@ dependencies = [
"prettyplease", "prettyplease",
"proc-macro2", "proc-macro2",
"prost-build 0.13.3", "prost-build 0.13.3",
"prost-types 0.13.3", "prost-types 0.13.5",
"quote", "quote",
"syn 2.0.100", "syn 2.0.100",
] ]
@@ -7654,7 +7657,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1" checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"prost-types 0.13.3", "prost-types 0.13.5",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic 0.13.1", "tonic 0.13.1",

View File

@@ -152,6 +152,7 @@ pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointe
procfs = "0.16" procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5" prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.8" rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] } redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2" regex = "1.10.2"

View File

@@ -1,4 +1,4 @@
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege; use compute_api::privilege::Privilege;
use compute_api::responses::{ use compute_api::responses::{
@@ -6,7 +6,7 @@ use compute_api::responses::{
LfcPrewarmState, TlsConfig, LfcPrewarmState, TlsConfig,
}; };
use compute_api::spec::{ use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
}; };
use futures::StreamExt; use futures::StreamExt;
use futures::future::join_all; use futures::future::join_all;
@@ -1003,19 +1003,12 @@ impl ComputeNode {
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = compute_state.pspec.as_ref().expect("spec must be set");
// Detect the protocol scheme. If the URL doesn't have a scheme, assume libpq.
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let scheme = match Url::parse(shard0_connstr) {
Ok(url) => url.scheme().to_lowercase().to_string(),
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
let started = Instant::now(); let started = Instant::now();
let (connected, size) = match scheme.as_str() {
"postgresql" | "postgres" => self.try_get_basebackup_libpq(spec, lsn)?, let (connected, size) = match PageserverProtocol::from_connstring(shard0_connstr)? {
"grpc" => self.try_get_basebackup_grpc(spec, lsn)?, PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
scheme => return Err(anyhow!("unknown URL scheme {scheme}")), PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
}; };
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();

View File

@@ -4,7 +4,9 @@ use std::thread;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use compute_api::spec::ComputeMode; use compute_api::spec::{ComputeMode, PageserverProtocol};
use itertools::Itertools as _;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage}; use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn}; use tracing::{info, warn};
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
@@ -76,25 +78,17 @@ fn acquire_lsn_lease_with_retry(
loop { loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt. // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let configs = { let (connstrings, auth) = {
let state = compute.state.lock().unwrap(); let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set"); let spec = state.pspec.as_ref().expect("spec must be set");
(
let conn_strings = spec.pageserver_connstr.split(','); spec.pageserver_connstr.clone(),
spec.storage_auth_token.clone(),
conn_strings )
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
config.password(storage_auth_token.clone());
}
config
})
.collect::<Vec<_>>()
}; };
let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs); let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
match result { match result {
Ok(Some(res)) => { Ok(Some(res)) => {
return Ok(res); return Ok(res);
@@ -116,68 +110,104 @@ fn acquire_lsn_lease_with_retry(
} }
} }
/// Tries to acquire an LSN lease through PS page_service API. /// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease( fn try_acquire_lsn_lease(
connstrings: &str,
auth: Option<&str>,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
lsn: Lsn, lsn: Lsn,
configs: &[postgres::Config],
) -> Result<Option<SystemTime>> { ) -> Result<Option<SystemTime>> {
fn get_valid_until( let connstrings = connstrings.split(',').collect_vec();
config: &postgres::Config, let shard_count = connstrings.len();
tenant_shard_id: TenantShardId, let mut leases = Vec::new();
timeline_id: TimelineId,
lsn: Lsn, for (shard_number, &connstring) in connstrings.iter().enumerate() {
) -> Result<Option<SystemTime>> { let tenant_shard_id = match shard_count {
let mut client = config.connect(NoTls)?; 0 | 1 => TenantShardId::unsharded(tenant_id),
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} "); shard_count => TenantShardId {
let res = client.simple_query(&cmd)?; tenant_id,
let msg = match res.first() { shard_number: ShardNumber(shard_number as u8),
Some(msg) => msg, shard_count: ShardCount::new(shard_count as u8),
None => bail!("empty response"), },
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
}; };
// Note: this will be None if a lease is explicitly not granted. let lease = match PageserverProtocol::from_connstring(connstring)? {
let valid_until_str = row.get("valid_until"); PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
let valid_until = valid_until_str.map(|s| { }
SystemTime::UNIX_EPOCH PageserverProtocol::Grpc => {
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64)) acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
.expect("Time larger than max SystemTime could handle") }
}); };
Ok(valid_until) leases.push(lease);
} }
let shard_count = configs.len(); Ok(leases.into_iter().min().flatten())
}
let valid_until = if shard_count > 1 { /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
configs /// postgresql:// scheme.
.iter() fn acquire_lsn_lease_libpq(
.enumerate() connstring: &str,
.map(|(shard_number, config)| { auth: Option<&str>,
let tenant_shard_id = TenantShardId { tenant_shard_id: TenantShardId,
tenant_id, timeline_id: TimelineId,
shard_count: ShardCount::new(shard_count as u8), lsn: Lsn,
shard_number: ShardNumber(shard_number as u8), ) -> Result<Option<SystemTime>> {
}; let mut config = postgres::Config::from_str(connstring)?;
get_valid_until(config, tenant_shard_id, timeline_id, lsn) if let Some(auth) = auth {
}) config.password(auth);
.collect::<Result<Vec<Option<SystemTime>>>>()? }
.into_iter() let mut client = config.connect(NoTls)?;
.min() let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
.unwrap() let res = client.simple_query(&cmd)?;
} else { let msg = match res.first() {
get_valid_until( Some(msg) => msg,
&configs[0], None => bail!("empty response"),
TenantShardId::unsharded(tenant_id), };
timeline_id, let row = match msg {
lsn, SimpleQueryMessage::Row(row) => row,
)? _ => bail!("error parsing lsn lease response"),
}; };
// Note: this will be None if a lease is explicitly not granted.
let valid_until_str = row.get("valid_until");
let valid_until = valid_until_str.map(|s| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until) Ok(valid_until)
} }
/// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
/// grpc:// scheme.
fn acquire_lsn_lease_grpc(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
})
}

View File

@@ -16,9 +16,9 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail}; use anyhow::{Context, Result, anyhow, bail};
use clap::Parser; use clap::Parser;
use compute_api::requests::ComputeClaimsScope; use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode; use compute_api::spec::{ComputeMode, PageserverProtocol};
use control_plane::broker::StorageBroker; use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol}; use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage}; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env; use control_plane::local_env;
use control_plane::local_env::{ use control_plane::local_env::{

View File

@@ -56,8 +56,8 @@ use compute_api::responses::{
TlsConfig, TlsConfig,
}; };
use compute_api::spec::{ use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
RemoteExtSpec, Role, PgIdent, RemoteExtSpec, Role,
}; };
use jsonwebtoken::jwk::{ use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations, AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
@@ -373,29 +373,6 @@ impl std::fmt::Display for EndpointTerminateMode {
} }
} }
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
impl Endpoint { impl Endpoint {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> { fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() { if !entry.file_type()?.is_dir() {

View File

@@ -12,6 +12,7 @@ jsonwebtoken.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
regex.workspace = true regex.workspace = true
url.workspace = true
utils = { path = "../utils" } utils = { path = "../utils" }
remote_storage = { version = "0.1", path = "../remote_storage/" } remote_storage = { version = "0.1", path = "../remote_storage/" }

View File

@@ -4,11 +4,14 @@
//! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or //! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
//! compute_ctl can fetch it by calling the control plane's API. //! compute_ctl can fetch it by calling the control plane's API.
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display;
use anyhow::anyhow;
use indexmap::IndexMap; use indexmap::IndexMap;
use regex::Regex; use regex::Regex;
use remote_storage::RemotePath; use remote_storage::RemotePath;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn; use utils::lsn::Lsn;
@@ -429,6 +432,47 @@ pub struct JwksSettings {
pub jwt_audience: Option<String>, pub jwt_audience: Option<String>,
} }
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
#[derive(Clone, Copy, Debug, Default)]
pub enum PageserverProtocol {
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
#[default]
Libpq,
/// A newer, gRPC-based protocol. Uses grpc:// scheme.
Grpc,
}
impl PageserverProtocol {
/// Parses the protocol from a connstring scheme. Defaults to Libpq if no scheme is given.
/// Errors if the connstring is an invalid URL.
pub fn from_connstring(connstring: &str) -> anyhow::Result<Self> {
let scheme = match Url::parse(connstring) {
Ok(url) => url.scheme().to_lowercase(),
Err(url::ParseError::RelativeUrlWithoutBase) => return Ok(Self::default()),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
match scheme.as_str() {
"postgresql" | "postgres" => Ok(Self::Libpq),
"grpc" => Ok(Self::Grpc),
scheme => Err(anyhow!("invalid protocol scheme: {scheme}")),
}
}
/// Returns the URL scheme for the protocol, for use in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::fs::File; use std::fs::File;

View File

@@ -11,6 +11,7 @@ futures.workspace = true
pageserver_api.workspace = true pageserver_api.workspace = true
postgres_ffi_types.workspace = true postgres_ffi_types.workspace = true
prost.workspace = true prost.workspace = true
prost-types.workspace = true
strum.workspace = true strum.workspace = true
strum_macros.workspace = true strum_macros.workspace = true
thiserror.workspace = true thiserror.workspace = true

View File

@@ -35,6 +35,8 @@
syntax = "proto3"; syntax = "proto3";
package page_api; package page_api;
import "google/protobuf/timestamp.proto";
service PageService { service PageService {
// Returns whether a relation exists. // Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse); rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
@@ -64,6 +66,10 @@ service PageService {
// Fetches an SLRU segment. // Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse); rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse);
} }
// The LSN a request should read at. // The LSN a request should read at.
@@ -252,3 +258,17 @@ message GetSlruSegmentRequest {
message GetSlruSegmentResponse { message GetSlruSegmentResponse {
bytes segment = 1; bytes segment = 1;
} }
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
message LeaseLsnRequest {
// The LSN to lease. Can't be 0 or below the current GC cutoff.
uint64 lsn = 1;
}
// Lease acquisition response. If the lease could not be granted because the LSN has already been
// garbage collected, a FailedPrecondition status will be returned instead.
message LeaseLsnResponse {
// The lease expiration time.
google.protobuf.Timestamp expires = 1;
}

View File

@@ -187,4 +187,17 @@ impl Client {
let response = self.client.get_slru_segment(proto_req).await?; let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?) Ok(response.into_inner().try_into()?)
} }
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(
&mut self,
req: model::LeaseLsnRequest,
) -> Result<model::LeaseLsnResponse, tonic::Status> {
let req = proto::LeaseLsnRequest::from(req);
Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?)
}
} }

View File

@@ -16,6 +16,7 @@
//! stream combinators without dealing with errors, and avoids validating the same message twice. //! stream combinators without dealing with errors, and avoids validating the same message twice.
use std::fmt::Display; use std::fmt::Display;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes; use bytes::Bytes;
use postgres_ffi_types::Oid; use postgres_ffi_types::Oid;
@@ -703,3 +704,54 @@ impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
// SlruKind is defined in pageserver_api::reltag. // SlruKind is defined in pageserver_api::reltag.
pub type SlruKind = pageserver_api::reltag::SlruKind; pub type SlruKind = pageserver_api::reltag::SlruKind;
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
/// collect the LSN until the lease expires.
pub struct LeaseLsnRequest {
/// The LSN to lease.
pub lsn: Lsn,
}
impl TryFrom<proto::LeaseLsnRequest> for LeaseLsnRequest {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnRequest) -> Result<Self, Self::Error> {
if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn"));
}
Ok(Self { lsn: Lsn(pb.lsn) })
}
}
impl From<LeaseLsnRequest> for proto::LeaseLsnRequest {
fn from(request: LeaseLsnRequest) -> Self {
Self { lsn: request.lsn.0 }
}
}
/// Lease expiration time. If the lease could not be granted because the LSN has already been
/// garbage collected, a FailedPrecondition status will be returned instead.
pub type LeaseLsnResponse = SystemTime;
impl TryFrom<proto::LeaseLsnResponse> for LeaseLsnResponse {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnResponse) -> Result<Self, Self::Error> {
let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?;
UNIX_EPOCH
.checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32))
.ok_or_else(|| ProtocolError::invalid("expires", expires))
}
}
impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
fn from(response: LeaseLsnResponse) -> Self {
let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default();
Self {
expires: Some(prost_types::Timestamp {
seconds: expires.as_secs() as i64,
nanos: expires.subsec_nanos() as i32,
}),
}
}
}

View File

@@ -14,6 +14,7 @@ use std::{io, str};
use anyhow::{Context as _, bail}; use anyhow::{Context as _, bail};
use bytes::{Buf as _, BufMut as _, BytesMut}; use bytes::{Buf as _, BufMut as _, BytesMut};
use chrono::Utc;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::{FutureExt, Stream}; use futures::{FutureExt, Stream};
use itertools::Itertools; use itertools::Itertools;
@@ -3760,6 +3761,36 @@ impl proto::PageService for GrpcPageServiceHandler {
let resp: page_api::GetSlruSegmentResponse = resp.segment; let resp: page_api::GetSlruSegmentResponse = resp.segment;
Ok(tonic::Response::new(resp.into())) Ok(tonic::Response::new(resp.into()))
} }
#[instrument(skip_all, fields(lsn))]
async fn lease_lsn(
&self,
req: tonic::Request<proto::LeaseLsnRequest>,
) -> Result<tonic::Response<proto::LeaseLsnResponse>, tonic::Status> {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate and convert the request, and decorate the span.
let req: page_api::LeaseLsnRequest = req.into_inner().try_into()?;
span_record!(lsn=%req.lsn);
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
let lease_length = timeline.get_lsn_lease_length();
let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
Ok(lease) => lease.valid_until,
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
};
// TODO: is this spammy? Move it compute-side?
info!(
"acquired lease for {} until {}",
req.lsn,
chrono::DateTime::<Utc>::from(expires).to_rfc3339()
);
Ok(tonic::Response::new(expires.into()))
}
} }
/// gRPC middleware layer that handles observability concerns: /// gRPC middleware layer that handles observability concerns:

View File

@@ -20,6 +20,7 @@ camino.workspace = true
chrono.workspace = true chrono.workspace = true
clap.workspace = true clap.workspace = true
clashmap.workspace = true clashmap.workspace = true
compute_api.workspace = true
cron.workspace = true cron.workspace = true
fail.workspace = true fail.workspace = true
futures.workspace = true futures.workspace = true

View File

@@ -5,7 +5,8 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol}; use compute_api::spec::PageserverProtocol;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv; use control_plane::local_env::LocalEnv;
use futures::StreamExt; use futures::StreamExt;
use hyper::StatusCode; use hyper::StatusCode;