From a7a0c3cd278c485a027620cfd373d6b9ca7e6c0c Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 14 Jun 2023 19:24:46 +0300 Subject: [PATCH] Invalidate proxy cache in http-over-sql (#4500) HTTP queries failed with errors `error connecting to server: failed to lookup address information: Name or service not known\n\nCaused by:\n failed to lookup address information: Name or service not known` The fix reused cache invalidation logic in proxy from usual postgres connections and added it to HTTP-over-SQL queries. Also removed a timeout for HTTP request, because it almost never worked on staging (50s+ time just to start the compute), and we can have the similar case in production. Should be ok, since we have a limits for the requests and responses. --- proxy/src/http/sql_over_http.rs | 136 +++++++++++++++++++++++--------- proxy/src/http/websocket.rs | 12 +-- proxy/src/proxy.rs | 39 ++++----- 3 files changed, 120 insertions(+), 67 deletions(-) diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 1007532a96..e8ad2d04f3 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -1,5 +1,6 @@ use futures::pin_mut; use futures::StreamExt; +use futures::TryFutureExt; use hyper::body::HttpBody; use hyper::http::HeaderName; use hyper::http::HeaderValue; @@ -11,8 +12,13 @@ use serde_json::Value; use tokio_postgres::types::Kind; use tokio_postgres::types::Type; use tokio_postgres::Row; +use tracing::error; +use tracing::info; +use tracing::instrument; use url::Url; +use crate::proxy::invalidate_cache; +use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; use crate::{auth, config::ProxyConfig, console}; #[derive(serde::Deserialize)] @@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result, serde_json::E } } +struct ConnInfo { + username: String, + dbname: String, + hostname: String, + password: String, +} + fn get_conn_info( headers: &HeaderMap, sni_hostname: Option, -) -> Result<(String, String, String, String), anyhow::Error> { +) -> Result { let connection_string = headers .get("Neon-Connection-String") .ok_or(anyhow::anyhow!("missing connection string"))? @@ -146,12 +159,12 @@ fn get_conn_info( } } - Ok(( - username.to_owned(), - dbname.to_owned(), - hostname.to_owned(), - password.to_owned(), - )) + Ok(ConnInfo { + username: username.to_owned(), + dbname: dbname.to_owned(), + hostname: hostname.to_owned(), + password: password.to_owned(), + }) } // TODO: return different http error codes @@ -164,10 +177,10 @@ pub async fn handle( // Determine the destination and connection params // let headers = request.headers(); - let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?; + let conn_info = get_conn_info(headers, sni_hostname)?; let credential_params = StartupMessageParams::new([ - ("user", &username), - ("database", &dbname), + ("user", &conn_info.username), + ("database", &conn_info.dbname), ("application_name", APP_NAME), ]); @@ -186,21 +199,20 @@ pub async fn handle( let creds = config .auth_backend .as_ref() - .map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names)) + .map(|_| { + auth::ClientCredentials::parse( + &credential_params, + Some(&conn_info.hostname), + common_names, + ) + }) .transpose()?; let extra = console::ConsoleReqExtra { session_id: uuid::Uuid::new_v4(), application_name: Some(APP_NAME), }; - let node = creds.wake_compute(&extra).await?.expect("msg"); - let conf = node.value.config; - let port = *conf.get_ports().first().expect("no port"); - let host = match conf.get_hosts().first().expect("no host") { - tokio_postgres::config::Host::Tcp(host) => host, - tokio_postgres::config::Host::Unix(_) => { - return Err(anyhow::anyhow!("unix socket is not supported")); - } - }; + + let mut node_info = creds.wake_compute(&extra).await?.expect("msg"); let request_content_length = match request.body().size_hint().upper() { Some(v) => v, @@ -220,28 +232,10 @@ pub async fn handle( let QueryData { query, params } = serde_json::from_slice(&body)?; let query_params = json_to_pg_text(params)?; - // - // Connenct to the destination - // - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .user(&username) - .password(&password) - .dbname(&dbname) - .max_backend_message_size(MAX_RESPONSE_SIZE) - .connect(tokio_postgres::NoTls) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - // // Now execute the query and return the result // + let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?; let row_stream = client.query_raw_txt(query, query_params).await?; // Manually drain the stream into a vector to leave row_stream hanging @@ -308,6 +302,70 @@ pub async fn handle( })) } +/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with +/// the difference that it uses `tokio_postgres` for the connection. +#[instrument(skip_all)] +async fn connect_to_compute( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + + loop { + match connect_to_compute_once(node_info, conn_info).await { + Err(e) if num_retries > 0 => { + info!("compute node's state has changed; requesting a wake-up"); + match creds.wake_compute(extra).await? { + // Update `node_info` and try one more time. + Some(new) => { + *node_info = new; + } + // Link auth doesn't work that way, so we just exit. + None => return Err(e), + } + } + other => return other, + } + + num_retries -= 1; + info!("retrying after wake-up ({num_retries} attempts left)"); + } +} + +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut config = (*node_info.config).clone(); + + let (client, connection) = config + .user(&conn_info.username) + .password(&conn_info.password) + .dbname(&conn_info.dbname) + .max_backend_message_size(MAX_RESPONSE_SIZE) + .connect(tokio_postgres::NoTls) + .inspect_err(|e: &tokio_postgres::Error| { + error!( + "failed to connect to compute node hosts={:?} ports={:?}: {}", + node_info.config.get_hosts(), + node_info.config.get_ports(), + e + ); + invalidate_cache(node_info) + }) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + Ok(client) +} + // // Convert postgres row with text-encoded values to JSON object // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index fbb602e3d2..9f467aceb7 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -26,7 +26,6 @@ use tls_listener::TlsListener; use tokio::{ io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, net::TcpListener, - select, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, warn, Instrument}; @@ -193,14 +192,9 @@ async fn ws_handler( // TODO: that deserves a refactor as now this function also handles http json client besides websockets. // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. } else if request.uri().path() == "/sql" && request.method() == Method::POST { - let result = select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - Err(anyhow::anyhow!("Query timed out")) - } - response = sql_over_http::handle(config, request, sni_hostname) => { - response - } - }; + let result = sql_over_http::handle(config, request, sni_hostname) + .instrument(info_span!("sql-over-http")) + .await; let status_code = match result { Ok(_) => StatusCode::OK, Err(_) => StatusCode::BAD_REQUEST, diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cf2dd000db..8efb7005c8 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -22,7 +22,7 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -283,34 +283,35 @@ async fn handshake( } } +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + node_info.invalidate(); + } + + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); +} + /// Try to connect to the compute node once. #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, ) -> Result { - // If we couldn't connect, a cached connection info might be to blame - // (e.g. the compute node's address might've changed at the wrong time). - // Invalidate the cache entry (if any) to prevent subsequent errors. - let invalidate_cache = |_: &compute::ConnectionError| { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - node_info.invalidate(); - } - - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - }; - let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config .connect(allow_self_signed_compute) - .inspect_err(invalidate_cache) + .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await }