Add timeout on proxy -> compute connection establishment.

Otherwise we sit up to default tcp_syn_retries (about 2+ min) before gettings os
error 110 if compute has been migrated to another pod.
This commit is contained in:
Arseny Sher
2023-04-25 12:22:58 +04:00
committed by Arseny Sher
parent 92214578af
commit 0112a602e1

View File

@@ -1,8 +1,8 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr};
use std::{io, net::SocketAddr, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
@@ -130,9 +130,23 @@ impl ConnCfg {
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
let connection_timeout = Duration::from_millis(10000);
tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map(
move |res| match res {
Ok(tcpstream_connect_res) => tcpstream_connect_res,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("exceeded connection timeout {connection_timeout:?}"),
)),
},
)
};
let connect_once = |host, port| {
info!("trying to connect to compute node at {host}:{port}");
TcpStream::connect((host, port)).and_then(|socket| async {
connect_with_timeout(host, port).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
@@ -165,7 +179,6 @@ impl ConnCfg {
Host::Unix(_) => continue, // unix sockets are not welcome here
};
// TODO: maybe we should add a timeout.
match connect_once(host, *port).await {
Ok(socket) => return Ok(socket),
Err(err) => {