Compare commits

...

2 Commits

Author SHA1 Message Date
Alex Chi Z
5622bf0bd2 maybe the old test would work?
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-30 14:58:44 -04:00
Alex Chi Z
81d0f5d74f feat(compute_ctl): run lease lsn in tokio and in parallel
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-30 11:48:46 -04:00
2 changed files with 88 additions and 72 deletions

View File

@@ -1,13 +1,13 @@
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
use futures::StreamExt;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use tracing::{Instrument, info, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -15,7 +15,7 @@ use utils::shard::TenantShardId;
use crate::compute::ComputeNode;
/// Spawns a background thread to periodically renew LSN leases for static compute.
/// Do nothing if the compute is not in static mode.
/// Do nothing if the compute is not in static mode. MUST run this within a tokio runtime.
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let (tenant_id, timeline_id, lsn) = {
let state = compute.state.lock().unwrap();
@@ -28,24 +28,27 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let compute = compute.clone();
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
thread::spawn(move || {
let _entered = span.entered();
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
tokio::spawn(
async move {
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn).await {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
}
}
});
.instrument(span),
);
}
/// Renews lsn lease periodically so static compute are not affected by GC.
fn lsn_lease_bg_task(
async fn lsn_lease_bg_task(
compute: Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<()> {
loop {
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
let valid_until =
acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?;
let valid_duration = valid_until
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
@@ -59,13 +62,17 @@ fn lsn_lease_bg_task(
"Request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
let compute = compute.clone();
tokio::task::spawn_blocking(move || {
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
})
.await?;
}
}
/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
fn acquire_lsn_lease_with_retry(
async fn acquire_lsn_lease_with_retry(
compute: &Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -86,7 +93,8 @@ fn acquire_lsn_lease_with_retry(
)
};
let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
let result =
try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await;
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -109,15 +117,16 @@ fn acquire_lsn_lease_with_retry(
}
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
async fn try_acquire_lsn_lease(
conninfo: PageserverConnectionInfo,
auth: Option<&str>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut leases = Vec::new();
const MAX_CONCURRENT_LEASE_CONNECTIONS: usize = 8;
let mut jobs = Vec::new();
for (shard_index, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = TenantShardId {
tenant_id,
@@ -129,46 +138,68 @@ fn try_acquire_lsn_lease(
// leas on all of them? Currently, that's what we assume, but this is hypothetical
// as of this writing, as we never pass the info for more than one pageserver per
// shard.
for pageserver in shard.pageservers {
let lease = match conninfo.prefer_protocol {
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&pageserver.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&pageserver.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
for shard in shard.pageservers {
let shard = shard.clone();
jobs.push(async move {
match conninfo.prefer_protocol {
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(
&shard.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)
.await
}
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(
&shard.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)
.await
}
}
});
}
}
Ok(leases.into_iter().min().flatten())
let mut stream = futures::stream::iter(jobs).buffer_unordered(MAX_CONCURRENT_LEASE_CONNECTIONS);
let mut leases = Vec::new();
while let Some(res) = stream.next().await {
let lease = res?;
leases.push(lease);
}
Ok(leases.into_iter().flatten().min())
}
/// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
/// postgresql:// scheme.
fn acquire_lsn_lease_libpq(
async fn acquire_lsn_lease_libpq(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut config = postgres::Config::from_str(connstring)?;
let mut config = tokio_postgres::Config::from_str(connstring)?;
if let Some(auth) = auth {
config.password(auth);
}
let mut client = config.connect(NoTls)?;
let (client, connection) = config.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::warn!("lease lsn connection error: {}", e);
}
});
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let res = client.simple_query(&cmd).await?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
@@ -186,35 +217,34 @@ fn acquire_lsn_lease_libpq(
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until)
}
/// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
/// grpc:// scheme.
fn acquire_lsn_lease_grpc(
async fn acquire_lsn_lease_grpc(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let mut client = page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
})
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
}

View File

@@ -233,15 +233,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
log.info(f"`SELECT` query succeed after GC, {ctx=}")
return offset
# It's not reliable to let the compute renew the lease in this test case as we have a very tight
# lease timeout. Therefore, the test case itself will renew the lease.
#
# This is a workaround to make the test case more deterministic.
def renew_lease(env: NeonEnv, lease_lsn: Lsn):
env.storage_controller.pageserver_api().timeline_lsn_lease(
env.initial_tenant, env.initial_timeline, lease_lsn
)
# Insert some records on main branch
with env.endpoints.create_start("main", config_lines=["shared_buffers=1MB"]) as ep_main:
with ep_main.cursor() as cur:
@@ -254,9 +245,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
XLOG_BLCKSZ = 8192
lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ)
# We need to mock the way cplane works: it gets a lease for a branch before starting the compute.
renew_lease(env, lsn)
with env.endpoints.create_start(
branch_name="main",
endpoint_id="static",
@@ -277,8 +265,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
ps.stop()
ps.start()
renew_lease(env, lsn)
trigger_gc_and_select(
env,
ep_static,