mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
fix(compute_ctl): race if pageserver connstr changes
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -2648,7 +2648,11 @@ LIMIT 100",
|
||||
/// the pageserver connection strings has changed.
|
||||
///
|
||||
/// The operation will time out after a specified duration.
|
||||
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
|
||||
pub fn wait_timeout_while_pageserver_connstr_unchanged(
|
||||
&self,
|
||||
duration: Duration,
|
||||
request_pageserver_conninfo: &PageserverConnectionInfo,
|
||||
) {
|
||||
let state = self.state.lock().unwrap();
|
||||
let old_pageserver_conninfo = state
|
||||
.pspec
|
||||
@@ -2656,6 +2660,10 @@ LIMIT 100",
|
||||
.expect("spec must be set")
|
||||
.pageserver_conninfo
|
||||
.clone();
|
||||
if request_pageserver_conninfo != &old_pageserver_conninfo {
|
||||
info!("Pageserver config changed during the previous request");
|
||||
return;
|
||||
}
|
||||
let mut unchanged = true;
|
||||
let _ = self
|
||||
.state_changed
|
||||
|
||||
@@ -47,7 +47,7 @@ async fn lsn_lease_bg_task(
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let valid_until =
|
||||
let (valid_until, last_conninfo) =
|
||||
acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?;
|
||||
let valid_duration = valid_until
|
||||
.duration_since(SystemTime::now())
|
||||
@@ -64,7 +64,7 @@ async fn lsn_lease_bg_task(
|
||||
);
|
||||
let compute = compute.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration, &last_conninfo);
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
@@ -77,7 +77,7 @@ async fn acquire_lsn_lease_with_retry(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<SystemTime> {
|
||||
) -> Result<(SystemTime, PageserverConnectionInfo)> {
|
||||
let mut attempts = 0usize;
|
||||
let mut retry_period_ms: f64 = 500.0;
|
||||
const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
|
||||
@@ -93,11 +93,17 @@ async fn acquire_lsn_lease_with_retry(
|
||||
)
|
||||
};
|
||||
|
||||
let result =
|
||||
try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await;
|
||||
let result = try_acquire_lsn_lease(
|
||||
conninfo.clone(),
|
||||
auth.as_deref(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(Some(res)) => {
|
||||
return Ok(res);
|
||||
return Ok((res, conninfo));
|
||||
}
|
||||
Ok(None) => {
|
||||
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
|
||||
@@ -105,9 +111,15 @@ async fn acquire_lsn_lease_with_retry(
|
||||
Err(e) => {
|
||||
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
|
||||
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
|
||||
retry_period_ms as u64,
|
||||
));
|
||||
let compute = compute.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(
|
||||
Duration::from_millis(retry_period_ms as u64),
|
||||
&conninfo,
|
||||
);
|
||||
})
|
||||
.await?;
|
||||
|
||||
retry_period_ms *= 1.5;
|
||||
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user