mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
allow repeated IO errors from compute node (#4624)
## Problem #4598 compute nodes are not accessible some time after wake up due to kubernetes DNS not being fully propagated. ## Summary of changes Update connect retry mechanism to support handling IO errors and sleeping for 100ms ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section.
This commit is contained in:
@@ -2,16 +2,15 @@ use parking_lot::Mutex;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::fmt;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use tokio::time;
|
||||
|
||||
use crate::config;
|
||||
use crate::{auth, console};
|
||||
|
||||
use super::sql_over_http::MAX_RESPONSE_SIZE;
|
||||
|
||||
use crate::proxy::invalidate_cache;
|
||||
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
|
||||
use crate::proxy::try_wake;
|
||||
use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE};
|
||||
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
@@ -223,32 +222,59 @@ async fn connect_to_compute(
|
||||
|
||||
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
|
||||
// the difference that it uses `tokio_postgres` for the connection.
|
||||
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
|
||||
let mut num_retries = 0;
|
||||
let mut should_wake = true;
|
||||
loop {
|
||||
match connect_to_compute_once(node_info, conn_info).await {
|
||||
Err(e) if num_retries > 0 => {
|
||||
info!("compute node's state has changed; requesting a wake-up");
|
||||
match creds.wake_compute(&extra).await? {
|
||||
// Update `node_info` and try one more time.
|
||||
Some(new) => {
|
||||
*node_info = new;
|
||||
Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, &extra, &creds).await {
|
||||
Ok(Some(x)) => should_wake = x,
|
||||
Ok(None) => return Err(e.into()),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
// Link auth doesn't work that way, so we just exit.
|
||||
None => return Err(e),
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
other => return other,
|
||||
other => return Ok(other?),
|
||||
}
|
||||
|
||||
num_retries -= 1;
|
||||
info!("retrying after wake-up ({num_retries} attempts left)");
|
||||
num_retries += 1;
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option<time::Duration> {
|
||||
use tokio_postgres::error::SqlState;
|
||||
match err.code() {
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
Some(
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
| &SqlState::CONNECTION_EXCEPTION
|
||||
| &SqlState::CONNECTION_DOES_NOT_EXIST
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
) => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
// otherwise, don't retry
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_to_compute_once(
|
||||
node_info: &console::CachedNodeInfo,
|
||||
conn_info: &ConnInfo,
|
||||
) -> anyhow::Result<tokio_postgres::Client> {
|
||||
) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
|
||||
let mut config = (*node_info.config).clone();
|
||||
|
||||
let (client, connection) = config
|
||||
@@ -257,15 +283,6 @@ async fn connect_to_compute_once(
|
||||
.dbname(&conn_info.dbname)
|
||||
.max_backend_message_size(MAX_RESPONSE_SIZE)
|
||||
.connect(tokio_postgres::NoTls)
|
||||
.inspect_err(|e: &tokio_postgres::Error| {
|
||||
error!(
|
||||
"failed to connect to compute node hosts={:?} ports={:?}: {}",
|
||||
node_info.config.get_hosts(),
|
||||
node_info.config.get_ports(),
|
||||
e
|
||||
);
|
||||
invalidate_cache(node_info)
|
||||
})
|
||||
.await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -6,12 +6,17 @@ use crate::{
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, messages::MetricsAuxInfo},
|
||||
console::{
|
||||
self,
|
||||
errors::{ApiError, WakeComputeError},
|
||||
messages::MetricsAuxInfo,
|
||||
},
|
||||
error::io_error,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::TryFutureExt;
|
||||
use hyper::StatusCode;
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
@@ -25,7 +30,9 @@ use tracing::{error, info, warn};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
|
||||
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
|
||||
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
|
||||
pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -315,7 +322,6 @@ async fn connect_to_compute_once(
|
||||
node_info
|
||||
.config
|
||||
.connect(allow_self_signed_compute, timeout)
|
||||
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -328,7 +334,8 @@ async fn connect_to_compute(
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
|
||||
let mut num_retries = 0;
|
||||
let mut should_wake = true;
|
||||
loop {
|
||||
// Apply startup params to the (possibly, cached) compute node info.
|
||||
node_info.config.set_startup_params(params);
|
||||
@@ -346,30 +353,83 @@ async fn connect_to_compute(
|
||||
// We only use caching in case of scram proxy backed by the console, so reduce
|
||||
// the timeout only in that case.
|
||||
let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _));
|
||||
let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE {
|
||||
let timeout = if is_scram_proxy && num_retries == 0 {
|
||||
time::Duration::from_secs(2)
|
||||
} else {
|
||||
time::Duration::from_secs(10)
|
||||
};
|
||||
|
||||
match connect_to_compute_once(node_info, timeout).await {
|
||||
Err(e) if num_retries > 0 => {
|
||||
info!("compute node's state has changed; requesting a wake-up");
|
||||
match creds.wake_compute(extra).map_err(io_error).await? {
|
||||
// Update `node_info` and try one more time.
|
||||
Some(mut new) => {
|
||||
new.config.reuse_password(&node_info.config);
|
||||
*node_info = new;
|
||||
Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, extra, creds).await {
|
||||
Ok(Some(x)) => {
|
||||
should_wake = x;
|
||||
}
|
||||
Ok(None) => return Err(e),
|
||||
Err(e) => return Err(io_error(e).into()),
|
||||
}
|
||||
}
|
||||
// Link auth doesn't work that way, so we just exit.
|
||||
None => return Err(e),
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
num_retries -= 1;
|
||||
info!("retrying after wake-up ({num_retries} attempts left)");
|
||||
num_retries += 1;
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Some(false)) if the wakeup succeeded
|
||||
/// * Returns Ok(None) or Err(e) if there was an error
|
||||
pub async fn try_wake(
|
||||
node_info: &mut console::CachedNodeInfo,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<Option<bool>, WakeComputeError> {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
invalidate_cache(node_info);
|
||||
match creds.wake_compute(extra).await {
|
||||
// retry wake if the compute was in an invalid state
|
||||
Err(WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
})) => Ok(Some(true)),
|
||||
// Update `node_info` and try again.
|
||||
Ok(Some(mut new)) => {
|
||||
new.config.reuse_password(&node_info.config);
|
||||
*node_info = new;
|
||||
Ok(Some(false))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(None) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option<time::Duration> {
|
||||
use std::io::ErrorKind;
|
||||
match err {
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() {
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
// otherwise, don't retry
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::io;
|
||||
|
||||
use super::*;
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
@@ -294,3 +296,18 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let err = compute::ConnectionError::CouldNotConnect(io::Error::new(
|
||||
io::ErrorKind::ConnectionRefused,
|
||||
"conn refused",
|
||||
));
|
||||
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 0..10 {
|
||||
total_wait += retry_connect_in(&err, num_retries).unwrap();
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user