feat(compute_ctl): run lease lsn in tokio and in parallel

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2025-07-30 11:47:29 -04:00
parent 842a5091d5
commit 81d0f5d74f

View File

@@ -1,13 +1,13 @@
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
use futures::StreamExt;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use tracing::{Instrument, info, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -15,7 +15,7 @@ use utils::shard::TenantShardId;
use crate::compute::ComputeNode;
/// Spawns a background thread to periodically renew LSN leases for static compute.
/// Do nothing if the compute is not in static mode.
/// Do nothing if the compute is not in static mode. MUST run this within a tokio runtime.
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let (tenant_id, timeline_id, lsn) = {
let state = compute.state.lock().unwrap();
@@ -28,24 +28,27 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let compute = compute.clone();
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
thread::spawn(move || {
let _entered = span.entered();
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
tokio::spawn(
async move {
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn).await {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
}
}
});
.instrument(span),
);
}
/// Renews lsn lease periodically so static compute are not affected by GC.
fn lsn_lease_bg_task(
async fn lsn_lease_bg_task(
compute: Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<()> {
loop {
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
let valid_until =
acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?;
let valid_duration = valid_until
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
@@ -65,7 +68,7 @@ fn lsn_lease_bg_task(
/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
fn acquire_lsn_lease_with_retry(
async fn acquire_lsn_lease_with_retry(
compute: &Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -86,7 +89,8 @@ fn acquire_lsn_lease_with_retry(
)
};
let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
let result =
try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await;
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -109,15 +113,16 @@ fn acquire_lsn_lease_with_retry(
}
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
async fn try_acquire_lsn_lease(
conninfo: PageserverConnectionInfo,
auth: Option<&str>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut leases = Vec::new();
const MAX_CONCURRENT_LEASE_CONNECTIONS: usize = 8;
let mut jobs = Vec::new();
for (shard_index, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = TenantShardId {
tenant_id,
@@ -129,46 +134,68 @@ fn try_acquire_lsn_lease(
// leas on all of them? Currently, that's what we assume, but this is hypothetical
// as of this writing, as we never pass the info for more than one pageserver per
// shard.
for pageserver in shard.pageservers {
let lease = match conninfo.prefer_protocol {
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&pageserver.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&pageserver.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
for shard in shard.pageservers {
let shard = shard.clone();
jobs.push(async move {
match conninfo.prefer_protocol {
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(
&shard.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)
.await
}
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(
&shard.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)
.await
}
}
});
}
}
Ok(leases.into_iter().min().flatten())
let mut stream = futures::stream::iter(jobs).buffer_unordered(MAX_CONCURRENT_LEASE_CONNECTIONS);
let mut leases = Vec::new();
while let Some(res) = stream.next().await {
let lease = res?;
leases.push(lease);
}
Ok(leases.into_iter().flatten().min())
}
/// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
/// postgresql:// scheme.
fn acquire_lsn_lease_libpq(
async fn acquire_lsn_lease_libpq(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut config = postgres::Config::from_str(connstring)?;
let mut config = tokio_postgres::Config::from_str(connstring)?;
if let Some(auth) = auth {
config.password(auth);
}
let mut client = config.connect(NoTls)?;
let (client, connection) = config.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::warn!("lease lsn connection error: {}", e);
}
});
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let res = client.simple_query(&cmd).await?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
@@ -186,35 +213,34 @@ fn acquire_lsn_lease_libpq(
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until)
}
/// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
/// grpc:// scheme.
fn acquire_lsn_lease_grpc(
async fn acquire_lsn_lease_grpc(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let mut client = page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
})
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
}