Merge pull request #4705 from neondatabase/release-2023-07-12

Release 2023-07-12 (only proxy)
This commit is contained in:
Alexander Bayandin
2023-07-12 19:44:36 +01:00
committed by GitHub
3 changed files with 191 additions and 53 deletions

View File

@@ -1,17 +1,16 @@
use parking_lot::Mutex;
use pq_proto::StartupMessageParams;
use std::fmt;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
use futures::TryFutureExt;
use tokio::time;
use crate::config;
use crate::{auth, console};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE};
use tracing::error;
use tracing::info;
@@ -185,11 +184,10 @@ impl GlobalConnPool {
}
}
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
#[tracing::instrument(skip_all)]
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -221,34 +219,79 @@ async fn connect_to_compute(
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
// the difference that it uses `tokio_postgres` for the connection.
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
let mut num_retries = 0;
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(&extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, &extra, &creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
}
match connect_to_compute_once(node_info, conn_info).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
return Err(e.into());
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e.into());
}
}
}
num_retries += 1;
info!(num_retries, "retrying connect");
}
}
fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool {
use tokio_postgres::error::SqlState;
match err.code() {
// retry all errors at least once
_ if num_retries == 0 => true,
// keep retrying connection errors
Some(
&SqlState::CONNECTION_FAILURE
| &SqlState::CONNECTION_EXCEPTION
| &SqlState::CONNECTION_DOES_NOT_EXIST
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true,
// otherwise, don't retry
_ => false,
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
@@ -257,15 +300,6 @@ async fn connect_to_compute_once(
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {

View File

@@ -6,16 +6,21 @@ use crate::{
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, messages::MetricsAuxInfo},
console::{
self,
errors::{ApiError, WakeComputeError},
messages::MetricsAuxInfo,
},
error::io_error,
stream::{PqStream, Stream},
};
use anyhow::{bail, Context};
use futures::TryFutureExt;
use hyper::StatusCode;
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use std::{ops::ControlFlow, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
time,
@@ -25,7 +30,9 @@ use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -315,7 +322,6 @@ async fn connect_to_compute_once(
node_info
.config
.connect(allow_self_signed_compute, timeout)
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
.await
}
@@ -328,11 +334,38 @@ async fn connect_to_compute(
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<PostgresConnection, compute::ConnectionError> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
let mut num_retries = 0;
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
loop {
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, extra, creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(io_error(e).into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
}
// Set a shorter timeout for the initial connection attempt.
//
// In case we try to connect to an outdated address that is no longer valid, the
@@ -346,30 +379,91 @@ async fn connect_to_compute(
// We only use caching in case of scram proxy backed by the console, so reduce
// the timeout only in that case.
let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _));
let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE {
let timeout = if is_scram_proxy && num_retries == 0 {
time::Duration::from_secs(2)
} else {
time::Duration::from_secs(10)
};
// do this again to ensure we have username?
node_info.config.set_startup_params(params);
match connect_to_compute_once(node_info, timeout).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).map_err(io_error).await? {
// Update `node_info` and try one more time.
Some(mut new) => {
new.config.reuse_password(&node_info.config);
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
return Err(e);
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e);
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
num_retries += 1;
info!(num_retries, "retrying connect");
}
}
/// Attempts to wake up the compute node.
/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable
/// * Returns Ok(Some(false)) if the wakeup succeeded
/// * Returns Ok(None) or Err(e) if there was an error
pub async fn try_wake(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<Option<ControlFlow<()>>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
match creds.wake_compute(extra).await {
// retry wake if the compute was in an invalid state
Err(WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
})) => Ok(Some(ControlFlow::Continue(()))),
// Update `node_info` and try again.
Ok(Some(mut new)) => {
new.config.reuse_password(&node_info.config);
*node_info = new;
Ok(Some(ControlFlow::Break(())))
}
Err(e) => Err(e),
Ok(None) => Ok(None),
}
}
fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool {
use std::io::ErrorKind;
match err {
// retry all errors at least once
_ if num_retries == 0 => true,
// keep retrying connection errors
compute::ConnectionError::CouldNotConnect(io_err)
if num_retries < NUM_RETRIES_WAKE_COMPUTE =>
{
matches!(
io_err.kind(),
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable
)
}
// otherwise, don't retry
_ => false,
}
}
pub fn retry_after(num_retries: u32) -> time::Duration {
match num_retries {
0 => time::Duration::ZERO,
_ => {
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
}
}
}

View File

@@ -294,3 +294,13 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
Ok(())
}
#[test]
fn connect_compute_total_wait() {
let mut total_wait = tokio::time::Duration::ZERO;
for num_retries in 0..10 {
total_wait += retry_after(num_retries);
}
assert!(total_wait < tokio::time::Duration::from_secs(12));
assert!(total_wait > tokio::time::Duration::from_secs(10));
}