diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8ccb625ff9..e287800164 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -493,6 +493,8 @@ impl ComputeNode { launch_lsn_lease_bg_task_for_static(&this); + ro_replica::spawn_bg_task(Arc::clone(&this)); + // We have a spec, start the compute let mut delay_exit = false; let mut vm_monitor = None; diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 1874bdea9a..b231f6db8c 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -30,3 +30,4 @@ mod spec_apply; pub mod swap; pub mod sync_sk; pub mod tls; +pub mod pageserver_client; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index bb0828429d..52a669daa6 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -13,7 +13,8 @@ use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::shard::{ShardCount, ShardNumber, TenantShardId}; -use crate::compute::ComputeNode; +use crate::compute::{ComputeNode, ParsedSpec}; +use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect}; /// Spawns a background thread to periodically renew LSN leases for static compute. /// Do nothing if the compute is not in static mode. @@ -78,17 +79,13 @@ fn acquire_lsn_lease_with_retry( loop { // Note: List of pageservers is dynamic, need to re-read configs before each attempt. - let (connstrings, auth) = { + let shards = { let state = compute.state.lock().unwrap(); - let spec = state.pspec.as_ref().expect("spec must be set"); - ( - spec.pageserver_connstr.clone(), - spec.storage_auth_token.clone(), - ) + let pspec = state.pspec.as_ref().expect("spec must be set"); + pageserver_connstrings_for_connect(pspec) }; - let result = - try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn); + let result = try_acquire_lsn_lease(shards, timeline_id, lsn); match result { Ok(Some(res)) => { return Ok(res); @@ -112,33 +109,32 @@ fn acquire_lsn_lease_with_retry( /// Tries to acquire LSN leases on all Pageserver shards. fn try_acquire_lsn_lease( - connstrings: &str, - auth: Option<&str>, - tenant_id: TenantId, + shards: Vec, timeline_id: TimelineId, lsn: Lsn, ) -> Result> { - let connstrings = connstrings.split(',').collect_vec(); - let shard_count = connstrings.len(); let mut leases = Vec::new(); - - for (shard_number, &connstring) in connstrings.iter().enumerate() { - let tenant_shard_id = match shard_count { - 0 | 1 => TenantShardId::unsharded(tenant_id), - shard_count => TenantShardId { - tenant_id, - shard_number: ShardNumber(shard_number as u8), - shard_count: ShardCount::new(shard_count as u8), - }, - }; - - let lease = match PageserverProtocol::from_connstring(connstring)? { - PageserverProtocol::Libpq => { - acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)? - } - PageserverProtocol::Grpc => { - acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)? - } + for ConnectInfo { + tenant_shard_id, + connstring, + auth, + } in shards + { + let lease = match PageserverProtocol::from_connstring(&connstring)? { + PageserverProtocol::Libpq => acquire_lsn_lease_libpq( + &connstring, + auth.as_deref(), + tenant_shard_id, + timeline_id, + lsn, + )?, + PageserverProtocol::Grpc => acquire_lsn_lease_grpc( + &connstring, + auth.as_deref(), + tenant_shard_id, + timeline_id, + lsn, + )?, }; leases.push(lease); } diff --git a/compute_tools/src/pageserver_client.rs b/compute_tools/src/pageserver_client.rs new file mode 100644 index 0000000000..27a49575ea --- /dev/null +++ b/compute_tools/src/pageserver_client.rs @@ -0,0 +1,43 @@ +use itertools::Itertools; +use utils::{ + id::TenantId, + shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId}, +}; + +use crate::compute::ParsedSpec; + +pub struct ConnectInfo { + pub tenant_shard_id: TenantShardId, + pub connstring: String, + pub auth: Option, +} + +pub fn pageserver_connstrings_for_connect( + pspec: &ParsedSpec, +) -> Vec { + let connstrings = &pspec.pageserver_connstr; + let auth = pspec.storage_auth_token.clone(); + + let connstrings = connstrings.split(',').collect_vec(); + let shard_count = connstrings.len(); + + let mut infos = Vec::with_capacity(connstrings.len()); + for (shard_number, connstring) in connstrings.iter().enumerate() { + let tenant_shard_id = match shard_count { + 0 | 1 => TenantShardId::unsharded(pspec.tenant_id), + shard_count => TenantShardId { + tenant_id: pspec.tenant_id, + shard_number: ShardNumber(shard_number as u8), + shard_count: ShardCount::new(shard_count as u8), + }, + }; + + infos.push(ConnectInfo { + tenant_shard_id, + connstring: connstring.to_string(), + auth: auth.clone(), + }); + } + + infos +} diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs index d221d0251d..eb6c5f535a 100644 --- a/compute_tools/src/ro_replica.rs +++ b/compute_tools/src/ro_replica.rs @@ -1,5 +1,20 @@ -use tracing::warn; -use utils::lsn::Lsn; +use std::{ + pin::Pin, + str::FromStr, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use compute_api::spec::PageserverProtocol; +use futures::{StreamExt, stream::FuturesUnordered}; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, error, info, info_span, warn}; +use utils::{backoff::retry, id::TimelineId, lsn::Lsn}; + +use crate::{ + compute::ComputeNode, + pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect}, +}; #[derive(Default)] pub(crate) struct GlobalState { @@ -18,3 +33,173 @@ impl GlobalState { }); } } + +pub fn spawn_bg_task(compute: Arc) { + std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(bg_task(compute)) + }); +} + +#[derive(Clone)] +struct Reservation { + horizon: Lsn, + expiration: Instant, +} + +async fn bg_task(compute: Arc) { + // Wait until we have the first value. + // Allows us to simply .unwrap() later because it never transitions back to None. + let mut min_inflight_request_lsn_changed = + compute.ro_replica.min_inflight_request_lsn.subscribe(); + min_inflight_request_lsn_changed.mark_changed(); + min_inflight_request_lsn_changed + .wait_for(|value| value.is_some()) + .await; + + let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(()); + std::thread::spawn({ + let compute = Arc::clone(&compute); + move || { + loop { + compute + .wait_timeout_while_pageserver_connstr_unchanged(Duration::from_secs(todo!())); + connstr_watch_tx.send_replace(()); + } + } + }); + + let mut reservation = Reservation { + horizon: Lsn(0), + expiration: Instant::now(), + }; + loop { + tokio::select! { + _ = tokio::time::sleep_until(reservation.expiration.into()) => { + info!("updating due to expiration"); + } + _ = connstr_watch_rx.changed() => { + info!("updating due to changed pageserver_connstr") + } + _ = async { + // debounce TODO make this lower in tests + tokio::time::sleep(Duration::from_secs(10)); + // every 10 GiB TODO make this tighter in tests? + let max_horizon_lag = 10 * (1<<30); + min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > reservation.horizon.0 + max_horizon_lag).await + } => { + info!(%reservation.horizon, "updating due to max horizon lag"); + } + } + // retry forever + let compute = Arc::clone(&compute); + reservation = retry( + || attempt(&compute), + |_| false, + 0, + u32::MAX, // forever + "update standby_horizon position in pageserver", + // There is no cancellation story in compute_ctl + &CancellationToken::new(), + ) + .await + .expect("is_permanent returns false, so, retry always returns Some") + .expect("u32::MAX exceeded"); + } +} + +// Returns expiration time +async fn attempt(compute: &Arc) -> anyhow::Result { + let (shards, timeline_id) = { + let state = compute.state.lock().unwrap(); + let pspec = state.pspec.as_ref().expect("spec must be set"); + (pageserver_connstrings_for_connect(pspec), pspec.timeline_id) + }; + + let lsn = compute + .ro_replica + .min_inflight_request_lsn + .borrow() + .expect("we only call this function once it has been transitioned to Some"); + let mut futs = FuturesUnordered::new(); + for connect_info in shards { + let logging_span = info_span!( + "attempt_one", + tenant_id=%connect_info.tenant_shard_id.tenant_id, + shard_id=%connect_info.tenant_shard_id.shard_slug(), + timeline_id=%timeline_id, + ); + let logging_wrapper = |fut: Pin>>>| { + async move { + match fut.await { + Ok(v) => Ok(v), + Err(err) => { + error!( + "failed to advance standby_horizon, communicator reads from this shard may star failing: {err:?}" + ); + Err(()) + } + }}.instrument(logging_span) + }; + let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? { + PageserverProtocol::Libpq => { + logging_wrapper(Box::pin(attempt_one_libpq(connect_info, timeline_id, lsn))) + } + PageserverProtocol::Grpc => { + logging_wrapper(Box::pin(attempt_one_grpc(connect_info, timeline_id, lsn))) + } + }; + futs.push(fut); + } + let mut errors = 0; + let mut min = None; + while let Some(res) = futs.next().await { + match res { + Ok(reservation) => { + let Reservation { + horizon, + expiration, + } = min.get_or_insert_with(|| reservation.clone()); + *horizon = std::cmp::min(*horizon, reservation.horizon); + *expiration = std::cmp::min(*expiration, reservation.expiration); + } + Err(()) => { + // the logging wrapper does the logging + } + } + } + if errors > 0 { + return Err(anyhow::anyhow!( + "failed to advance standby_horizon for {errors} shards, check logs for details" + )); + } + match min { + Some(min) => Ok(min), + None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen + } +} + +async fn attempt_one_libpq( + connect_info: ConnectInfo, + timeline_id: TimelineId, + lsn: Lsn, +) -> anyhow::Result { + let ConnectInfo { + tenant_shard_id, + connstring, + auth, + } = connect_info; + tokio_postgres::Config::from_str(&connstring)?; + todo!() +} + +async fn attempt_one_grpc( + connect_info: ConnectInfo, + timeline_id: TimelineId, + lsn: Lsn, +) -> anyhow::Result { + todo!() +}