This commit is contained in:
Christian Schwarz
2025-07-17 17:01:31 +00:00
parent c9dbfd737d
commit ca82b739d3
5 changed files with 261 additions and 34 deletions

View File

@@ -493,6 +493,8 @@ impl ComputeNode {
launch_lsn_lease_bg_task_for_static(&this);
ro_replica::spawn_bg_task(Arc::clone(&this));
// We have a spec, start the compute
let mut delay_exit = false;
let mut vm_monitor = None;

View File

@@ -30,3 +30,4 @@ mod spec_apply;
pub mod swap;
pub mod sync_sk;
pub mod tls;
pub mod pageserver_client;

View File

@@ -13,7 +13,8 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use crate::compute::ComputeNode;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect};
/// Spawns a background thread to periodically renew LSN leases for static compute.
/// Do nothing if the compute is not in static mode.
@@ -78,17 +79,13 @@ fn acquire_lsn_lease_with_retry(
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let (connstrings, auth) = {
let shards = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
(
spec.pageserver_connstr.clone(),
spec.storage_auth_token.clone(),
)
let pspec = state.pspec.as_ref().expect("spec must be set");
pageserver_connstrings_for_connect(pspec)
};
let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
let result = try_acquire_lsn_lease(shards, timeline_id, lsn);
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -112,33 +109,32 @@ fn acquire_lsn_lease_with_retry(
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
connstrings: &str,
auth: Option<&str>,
tenant_id: TenantId,
shards: Vec<ConnectInfo>,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut leases = Vec::new();
for (shard_number, &connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
let lease = match PageserverProtocol::from_connstring(connstring)? {
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
for ConnectInfo {
tenant_shard_id,
connstring,
auth,
} in shards
{
let lease = match PageserverProtocol::from_connstring(&connstring)? {
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&connstring,
auth.as_deref(),
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&connstring,
auth.as_deref(),
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
}

View File

@@ -0,0 +1,43 @@
use itertools::Itertools;
use utils::{
id::TenantId,
shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId},
};
use crate::compute::ParsedSpec;
pub struct ConnectInfo {
pub tenant_shard_id: TenantShardId,
pub connstring: String,
pub auth: Option<String>,
}
pub fn pageserver_connstrings_for_connect(
pspec: &ParsedSpec,
) -> Vec<ConnectInfo> {
let connstrings = &pspec.pageserver_connstr;
let auth = pspec.storage_auth_token.clone();
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut infos = Vec::with_capacity(connstrings.len());
for (shard_number, connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(pspec.tenant_id),
shard_count => TenantShardId {
tenant_id: pspec.tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
infos.push(ConnectInfo {
tenant_shard_id,
connstring: connstring.to_string(),
auth: auth.clone(),
});
}
infos
}

View File

@@ -1,5 +1,20 @@
use tracing::warn;
use utils::lsn::Lsn;
use std::{
pin::Pin,
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use compute_api::spec::PageserverProtocol;
use futures::{StreamExt, stream::FuturesUnordered};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, warn};
use utils::{backoff::retry, id::TimelineId, lsn::Lsn};
use crate::{
compute::ComputeNode,
pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect},
};
#[derive(Default)]
pub(crate) struct GlobalState {
@@ -18,3 +33,173 @@ impl GlobalState {
});
}
}
pub fn spawn_bg_task(compute: Arc<ComputeNode>) {
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(bg_task(compute))
});
}
#[derive(Clone)]
struct Reservation {
horizon: Lsn,
expiration: Instant,
}
async fn bg_task(compute: Arc<ComputeNode>) {
// Wait until we have the first value.
// Allows us to simply .unwrap() later because it never transitions back to None.
let mut min_inflight_request_lsn_changed =
compute.ro_replica.min_inflight_request_lsn.subscribe();
min_inflight_request_lsn_changed.mark_changed();
min_inflight_request_lsn_changed
.wait_for(|value| value.is_some())
.await;
let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(());
std::thread::spawn({
let compute = Arc::clone(&compute);
move || {
loop {
compute
.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_secs(todo!()));
connstr_watch_tx.send_replace(());
}
}
});
let mut reservation = Reservation {
horizon: Lsn(0),
expiration: Instant::now(),
};
loop {
tokio::select! {
_ = tokio::time::sleep_until(reservation.expiration.into()) => {
info!("updating due to expiration");
}
_ = connstr_watch_rx.changed() => {
info!("updating due to changed pageserver_connstr")
}
_ = async {
// debounce TODO make this lower in tests
tokio::time::sleep(Duration::from_secs(10));
// every 10 GiB TODO make this tighter in tests?
let max_horizon_lag = 10 * (1<<30);
min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > reservation.horizon.0 + max_horizon_lag).await
} => {
info!(%reservation.horizon, "updating due to max horizon lag");
}
}
// retry forever
let compute = Arc::clone(&compute);
reservation = retry(
|| attempt(&compute),
|_| false,
0,
u32::MAX, // forever
"update standby_horizon position in pageserver",
// There is no cancellation story in compute_ctl
&CancellationToken::new(),
)
.await
.expect("is_permanent returns false, so, retry always returns Some")
.expect("u32::MAX exceeded");
}
}
// Returns expiration time
async fn attempt(compute: &Arc<ComputeNode>) -> anyhow::Result<Reservation> {
let (shards, timeline_id) = {
let state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
(pageserver_connstrings_for_connect(pspec), pspec.timeline_id)
};
let lsn = compute
.ro_replica
.min_inflight_request_lsn
.borrow()
.expect("we only call this function once it has been transitioned to Some");
let mut futs = FuturesUnordered::new();
for connect_info in shards {
let logging_span = info_span!(
"attempt_one",
tenant_id=%connect_info.tenant_shard_id.tenant_id,
shard_id=%connect_info.tenant_shard_id.shard_slug(),
timeline_id=%timeline_id,
);
let logging_wrapper = |fut: Pin<Box<dyn Future<Output = anyhow::Result<Reservation>>>>| {
async move {
match fut.await {
Ok(v) => Ok(v),
Err(err) => {
error!(
"failed to advance standby_horizon, communicator reads from this shard may star failing: {err:?}"
);
Err(())
}
}}.instrument(logging_span)
};
let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? {
PageserverProtocol::Libpq => {
logging_wrapper(Box::pin(attempt_one_libpq(connect_info, timeline_id, lsn)))
}
PageserverProtocol::Grpc => {
logging_wrapper(Box::pin(attempt_one_grpc(connect_info, timeline_id, lsn)))
}
};
futs.push(fut);
}
let mut errors = 0;
let mut min = None;
while let Some(res) = futs.next().await {
match res {
Ok(reservation) => {
let Reservation {
horizon,
expiration,
} = min.get_or_insert_with(|| reservation.clone());
*horizon = std::cmp::min(*horizon, reservation.horizon);
*expiration = std::cmp::min(*expiration, reservation.expiration);
}
Err(()) => {
// the logging wrapper does the logging
}
}
}
if errors > 0 {
return Err(anyhow::anyhow!(
"failed to advance standby_horizon for {errors} shards, check logs for details"
));
}
match min {
Some(min) => Ok(min),
None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen
}
}
async fn attempt_one_libpq(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lsn: Lsn,
) -> anyhow::Result<Reservation> {
let ConnectInfo {
tenant_shard_id,
connstring,
auth,
} = connect_info;
tokio_postgres::Config::from_str(&connstring)?;
todo!()
}
async fn attempt_one_grpc(
connect_info: ConnectInfo,
timeline_id: TimelineId,
lsn: Lsn,
) -> anyhow::Result<Reservation> {
todo!()
}