diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 6abfea82e0..a04c986fbb 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -1,13 +1,13 @@ use std::str::FromStr; use std::sync::Arc; -use std::thread; use std::time::{Duration, SystemTime}; use anyhow::{Result, bail}; use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol}; +use futures::StreamExt; use pageserver_page_api as page_api; use postgres::{NoTls, SimpleQueryMessage}; -use tracing::{info, warn}; +use tracing::{Instrument, info, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::shard::TenantShardId; @@ -15,7 +15,7 @@ use utils::shard::TenantShardId; use crate::compute::ComputeNode; /// Spawns a background thread to periodically renew LSN leases for static compute. -/// Do nothing if the compute is not in static mode. +/// Do nothing if the compute is not in static mode. MUST run this within a tokio runtime. pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let (tenant_id, timeline_id, lsn) = { let state = compute.state.lock().unwrap(); @@ -28,24 +28,27 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let compute = compute.clone(); let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); - thread::spawn(move || { - let _entered = span.entered(); - if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { - // TODO: might need stronger error feedback than logging an warning. - warn!("Exited with error: {e}"); + tokio::spawn( + async move { + if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn).await { + // TODO: might need stronger error feedback than logging an warning. + warn!("Exited with error: {e}"); + } } - }); + .instrument(span), + ); } /// Renews lsn lease periodically so static compute are not affected by GC. -fn lsn_lease_bg_task( +async fn lsn_lease_bg_task( compute: Arc, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Result<()> { loop { - let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let valid_until = + acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?; let valid_duration = valid_until .duration_since(SystemTime::now()) .unwrap_or(Duration::ZERO); @@ -65,7 +68,7 @@ fn lsn_lease_bg_task( /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted. /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. -fn acquire_lsn_lease_with_retry( +async fn acquire_lsn_lease_with_retry( compute: &Arc, tenant_id: TenantId, timeline_id: TimelineId, @@ -86,7 +89,8 @@ fn acquire_lsn_lease_with_retry( ) }; - let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn); + let result = + try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await; match result { Ok(Some(res)) => { return Ok(res); @@ -109,15 +113,16 @@ fn acquire_lsn_lease_with_retry( } /// Tries to acquire LSN leases on all Pageserver shards. -fn try_acquire_lsn_lease( +async fn try_acquire_lsn_lease( conninfo: PageserverConnectionInfo, auth: Option<&str>, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Result> { - let mut leases = Vec::new(); + const MAX_CONCURRENT_LEASE_CONNECTIONS: usize = 8; + let mut jobs = Vec::new(); for (shard_index, shard) in conninfo.shards.into_iter() { let tenant_shard_id = TenantShardId { tenant_id, @@ -129,46 +134,68 @@ fn try_acquire_lsn_lease( // leas on all of them? Currently, that's what we assume, but this is hypothetical // as of this writing, as we never pass the info for more than one pageserver per // shard. - for pageserver in shard.pageservers { - let lease = match conninfo.prefer_protocol { - PageserverProtocol::Grpc => acquire_lsn_lease_grpc( - &pageserver.grpc_url.unwrap(), - auth, - tenant_shard_id, - timeline_id, - lsn, - )?, - PageserverProtocol::Libpq => acquire_lsn_lease_libpq( - &pageserver.libpq_url.unwrap(), - auth, - tenant_shard_id, - timeline_id, - lsn, - )?, - }; - leases.push(lease); + + for shard in shard.pageservers { + let shard = shard.clone(); + jobs.push(async move { + match conninfo.prefer_protocol { + PageserverProtocol::Grpc => { + acquire_lsn_lease_grpc( + &shard.grpc_url.unwrap(), + auth, + tenant_shard_id, + timeline_id, + lsn, + ) + .await + } + PageserverProtocol::Libpq => { + acquire_lsn_lease_libpq( + &shard.libpq_url.unwrap(), + auth, + tenant_shard_id, + timeline_id, + lsn, + ) + .await + } + } + }); } } - Ok(leases.into_iter().min().flatten()) + let mut stream = futures::stream::iter(jobs).buffer_unordered(MAX_CONCURRENT_LEASE_CONNECTIONS); + let mut leases = Vec::new(); + while let Some(res) = stream.next().await { + let lease = res?; + leases.push(lease); + } + Ok(leases.into_iter().flatten().min()) } /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a /// postgresql:// scheme. -fn acquire_lsn_lease_libpq( +async fn acquire_lsn_lease_libpq( connstring: &str, auth: Option<&str>, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn, ) -> Result> { - let mut config = postgres::Config::from_str(connstring)?; + let mut config = tokio_postgres::Config::from_str(connstring)?; if let Some(auth) = auth { config.password(auth); } - let mut client = config.connect(NoTls)?; + let (client, connection) = config.connect(NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::warn!("lease lsn connection error: {}", e); + } + }); + let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} "); - let res = client.simple_query(&cmd)?; + let res = client.simple_query(&cmd).await?; let msg = match res.first() { Some(msg) => msg, None => bail!("empty response"), @@ -186,35 +213,34 @@ fn acquire_lsn_lease_libpq( .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64)) .expect("Time larger than max SystemTime could handle") }); + Ok(valid_until) } /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a /// grpc:// scheme. -fn acquire_lsn_lease_grpc( +async fn acquire_lsn_lease_grpc( connstring: &str, auth: Option<&str>, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn, ) -> Result> { - tokio::runtime::Handle::current().block_on(async move { - let mut client = page_api::Client::connect( - connstring.to_string(), - tenant_shard_id.tenant_id, - timeline_id, - tenant_shard_id.to_index(), - auth.map(String::from), - None, - ) - .await?; + let mut client = page_api::Client::connect( + connstring.to_string(), + tenant_shard_id.tenant_id, + timeline_id, + tenant_shard_id.to_index(), + auth.map(String::from), + None, + ) + .await?; - let req = page_api::LeaseLsnRequest { lsn }; - match client.lease_lsn(req).await { - Ok(expires) => Ok(Some(expires)), - // Lease couldn't be acquired because the LSN has been garbage collected. - Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None), - Err(err) => Err(err.into()), - } - }) + let req = page_api::LeaseLsnRequest { lsn }; + match client.lease_lsn(req).await { + Ok(expires) => Ok(Some(expires)), + // Lease couldn't be acquired because the LSN has been garbage collected. + Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None), + Err(err) => Err(err.into()), + } }