From 6ace79345de41e4569856c7ad6978cb1bb1e1765 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 12 Oct 2022 21:00:44 +0300 Subject: [PATCH] [proxy] Add more context to console requests logging (#2583) --- proxy/src/auth/backend/console.rs | 68 ++++++++++++++----------------- proxy/src/auth/backend/link.rs | 7 ++-- proxy/src/http.rs | 11 +++++ proxy/src/proxy.rs | 8 ++-- proxy/src/stream.rs | 8 ++-- 5 files changed, 53 insertions(+), 49 deletions(-) diff --git a/proxy/src/auth/backend/console.rs b/proxy/src/auth/backend/console.rs index 7dbb173b88..cf99aa08ef 100644 --- a/proxy/src/auth/backend/console.rs +++ b/proxy/src/auth/backend/console.rs @@ -8,36 +8,20 @@ use crate::{ http, scram, stream::PqStream, }; +use futures::TryFutureExt; use serde::{Deserialize, Serialize}; use std::future::Future; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::info; +use tracing::{error, info, info_span}; const REQUEST_FAILED: &str = "Console request failed"; #[derive(Debug, Error)] -pub enum TransportError { - #[error("Console responded with a malformed JSON: {0}")] - BadResponse(#[from] serde_json::Error), +#[error("{}", REQUEST_FAILED)] +pub struct TransportError(#[from] std::io::Error); - /// HTTP status (other than 200) returned by the console. - #[error("Console responded with an HTTP status: {0}")] - HttpStatus(reqwest::StatusCode), - - #[error(transparent)] - Io(#[from] std::io::Error), -} - -impl UserFacingError for TransportError { - fn to_string_client(&self) -> String { - use TransportError::*; - match self { - HttpStatus(_) => self.to_string(), - _ => REQUEST_FAILED.to_owned(), - } - } -} +impl UserFacingError for TransportError {} // Helps eliminate graceless `.map_err` calls without introducing another ctor. impl From for TransportError { @@ -162,15 +146,19 @@ impl<'a> Api<'a> { ]) .build()?; - info!(id = request_id, url = req.url().as_str(), "request"); - let resp = self.endpoint.execute(req).await?; - if !resp.status().is_success() { - return Err(TransportError::HttpStatus(resp.status()).into()); - } + let span = info_span!("http", id = request_id, url = req.url().as_str()); + info!(parent: &span, "request auth info"); + let msg = self + .endpoint + .checked_execute(req) + .and_then(|r| r.json::()) + .await + .map_err(|e| { + error!(parent: &span, "{e}"); + e + })?; - let response: GetRoleSecretResponse = serde_json::from_str(&resp.text().await?)?; - - scram::ServerSecret::parse(&response.role_secret) + scram::ServerSecret::parse(&msg.role_secret) .map(AuthInfo::Scram) .ok_or(GetAuthInfoError::BadSecret) } @@ -189,17 +177,21 @@ impl<'a> Api<'a> { ]) .build()?; - info!(id = request_id, url = req.url().as_str(), "request"); - let resp = self.endpoint.execute(req).await?; - if !resp.status().is_success() { - return Err(TransportError::HttpStatus(resp.status()).into()); - } - - let response: GetWakeComputeResponse = serde_json::from_str(&resp.text().await?)?; + let span = info_span!("http", id = request_id, url = req.url().as_str()); + info!(parent: &span, "request wake-up"); + let msg = self + .endpoint + .checked_execute(req) + .and_then(|r| r.json::()) + .await + .map_err(|e| { + error!(parent: &span, "{e}"); + e + })?; // Unfortunately, ownership won't let us use `Option::ok_or` here. - let (host, port) = match parse_host_port(&response.address) { - None => return Err(WakeComputeError::BadComputeAddress(response.address)), + let (host, port) = match parse_host_port(&msg.address) { + None => return Err(WakeComputeError::BadComputeAddress(msg.address)), Some(x) => x, }; diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 863ed53645..c8ca418144 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -1,7 +1,7 @@ use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::info; +use tracing::{info, info_span}; use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage}; #[derive(Debug, Error)] @@ -51,11 +51,12 @@ pub async fn handle_user( client: &mut PqStream, ) -> auth::Result { let psql_session_id = new_psql_session_id(); + let span = info_span!("link", psql_session_id = &psql_session_id); let greeting = hello_message(link_uri, &psql_session_id); let db_info = super::with_waiter(psql_session_id, |waiter| async { // Give user a URL to spawn a new database. - info!("sending the auth URL to the user"); + info!(parent: &span, "sending the auth URL to the user"); client .write_message_noflush(&Be::AuthenticationOk)? .write_message_noflush(&BeParameterStatusMessage::encoding())? @@ -63,7 +64,7 @@ pub async fn handle_user( .await?; // Wait for web console response (see `mgmt`). - info!("waiting for console's reply..."); + info!(parent: &span, "waiting for console's reply..."); waiter.await?.map_err(LinkAuthError::AuthFailed) }) .await?; diff --git a/proxy/src/http.rs b/proxy/src/http.rs index dbeb3dc784..6f9145678b 100644 --- a/proxy/src/http.rs +++ b/proxy/src/http.rs @@ -17,6 +17,7 @@ impl Endpoint { Self { endpoint, client } } + #[inline(always)] pub fn url(&self) -> &ApiUrl { &self.endpoint } @@ -36,6 +37,16 @@ impl Endpoint { ) -> Result { self.client.execute(request).await } + + /// Execute a [request](reqwest::Request) and raise an error if status != 200. + pub async fn checked_execute( + &self, + request: reqwest::Request, + ) -> Result { + self.execute(request) + .await + .and_then(|r| r.error_for_status()) + } } #[cfg(test)] diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 5dcaa000cf..889445239a 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -1,7 +1,7 @@ use crate::auth; use crate::cancellation::{self, CancelMap}; use crate::config::{ProxyConfig, TlsConfig}; -use crate::stream::{MetricsStream, PqStream, Stream}; +use crate::stream::{MeasuredStream, PqStream, Stream}; use anyhow::{bail, Context}; use futures::TryFutureExt; use metrics::{register_int_counter, IntCounter}; @@ -64,7 +64,7 @@ pub async fn task_main( let cancel_map = Arc::new(CancelMap::default()); loop { let (socket, peer_addr) = listener.accept().await?; - info!("accepted connection from {peer_addr}"); + info!("accepted postgres client connection from {peer_addr}"); let session_id = uuid::Uuid::new_v4(); let cancel_map = Arc::clone(&cancel_map); @@ -270,8 +270,8 @@ impl Client<'_, S> { // Starting from here we only proxy the client's traffic. info!("performing the proxy pass..."); - let mut db = MetricsStream::new(db.stream, inc_proxied); - let mut client = MetricsStream::new(stream.into_inner(), inc_proxied); + let mut db = MeasuredStream::new(db.stream, inc_proxied); + let mut client = MeasuredStream::new(stream.into_inner(), inc_proxied); let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?; Ok(()) diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 54ff8bcc07..2a224944e2 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -231,7 +231,7 @@ impl AsyncWrite for Stream { pin_project! { /// This stream tracks all writes and calls user provided /// callback when the underlying stream is flushed. - pub struct MetricsStream { + pub struct MeasuredStream { #[pin] stream: S, write_count: usize, @@ -239,7 +239,7 @@ pin_project! { } } -impl MetricsStream { +impl MeasuredStream { pub fn new(stream: S, inc_write_count: W) -> Self { Self { stream, @@ -249,7 +249,7 @@ impl MetricsStream { } } -impl AsyncRead for MetricsStream { +impl AsyncRead for MeasuredStream { fn poll_read( self: Pin<&mut Self>, context: &mut task::Context<'_>, @@ -259,7 +259,7 @@ impl AsyncRead for MetricsStream { } } -impl AsyncWrite for MetricsStream { +impl AsyncWrite for MeasuredStream { fn poll_write( self: Pin<&mut Self>, context: &mut task::Context<'_>,