mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
[proxy] Add more context to console requests logging (#2583)
This commit is contained in:
@@ -8,36 +8,20 @@ use crate::{
|
||||
http, scram,
|
||||
stream::PqStream,
|
||||
};
|
||||
use futures::TryFutureExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::future::Future;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
use tracing::{error, info, info_span};
|
||||
|
||||
const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum TransportError {
|
||||
#[error("Console responded with a malformed JSON: {0}")]
|
||||
BadResponse(#[from] serde_json::Error),
|
||||
#[error("{}", REQUEST_FAILED)]
|
||||
pub struct TransportError(#[from] std::io::Error);
|
||||
|
||||
/// HTTP status (other than 200) returned by the console.
|
||||
#[error("Console responded with an HTTP status: {0}")]
|
||||
HttpStatus(reqwest::StatusCode),
|
||||
|
||||
#[error(transparent)]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
impl UserFacingError for TransportError {
|
||||
fn to_string_client(&self) -> String {
|
||||
use TransportError::*;
|
||||
match self {
|
||||
HttpStatus(_) => self.to_string(),
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl UserFacingError for TransportError {}
|
||||
|
||||
// Helps eliminate graceless `.map_err` calls without introducing another ctor.
|
||||
impl From<reqwest::Error> for TransportError {
|
||||
@@ -162,15 +146,19 @@ impl<'a> Api<'a> {
|
||||
])
|
||||
.build()?;
|
||||
|
||||
info!(id = request_id, url = req.url().as_str(), "request");
|
||||
let resp = self.endpoint.execute(req).await?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(TransportError::HttpStatus(resp.status()).into());
|
||||
}
|
||||
let span = info_span!("http", id = request_id, url = req.url().as_str());
|
||||
info!(parent: &span, "request auth info");
|
||||
let msg = self
|
||||
.endpoint
|
||||
.checked_execute(req)
|
||||
.and_then(|r| r.json::<GetRoleSecretResponse>())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(parent: &span, "{e}");
|
||||
e
|
||||
})?;
|
||||
|
||||
let response: GetRoleSecretResponse = serde_json::from_str(&resp.text().await?)?;
|
||||
|
||||
scram::ServerSecret::parse(&response.role_secret)
|
||||
scram::ServerSecret::parse(&msg.role_secret)
|
||||
.map(AuthInfo::Scram)
|
||||
.ok_or(GetAuthInfoError::BadSecret)
|
||||
}
|
||||
@@ -189,17 +177,21 @@ impl<'a> Api<'a> {
|
||||
])
|
||||
.build()?;
|
||||
|
||||
info!(id = request_id, url = req.url().as_str(), "request");
|
||||
let resp = self.endpoint.execute(req).await?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(TransportError::HttpStatus(resp.status()).into());
|
||||
}
|
||||
|
||||
let response: GetWakeComputeResponse = serde_json::from_str(&resp.text().await?)?;
|
||||
let span = info_span!("http", id = request_id, url = req.url().as_str());
|
||||
info!(parent: &span, "request wake-up");
|
||||
let msg = self
|
||||
.endpoint
|
||||
.checked_execute(req)
|
||||
.and_then(|r| r.json::<GetWakeComputeResponse>())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(parent: &span, "{e}");
|
||||
e
|
||||
})?;
|
||||
|
||||
// Unfortunately, ownership won't let us use `Option::ok_or` here.
|
||||
let (host, port) = match parse_host_port(&response.address) {
|
||||
None => return Err(WakeComputeError::BadComputeAddress(response.address)),
|
||||
let (host, port) = match parse_host_port(&msg.address) {
|
||||
None => return Err(WakeComputeError::BadComputeAddress(msg.address)),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
use tracing::{info, info_span};
|
||||
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -51,11 +51,12 @@ pub async fn handle_user(
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<compute::NodeInfo> {
|
||||
let psql_session_id = new_psql_session_id();
|
||||
let span = info_span!("link", psql_session_id = &psql_session_id);
|
||||
let greeting = hello_message(link_uri, &psql_session_id);
|
||||
|
||||
let db_info = super::with_waiter(psql_session_id, |waiter| async {
|
||||
// Give user a URL to spawn a new database.
|
||||
info!("sending the auth URL to the user");
|
||||
info!(parent: &span, "sending the auth URL to the user");
|
||||
client
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
@@ -63,7 +64,7 @@ pub async fn handle_user(
|
||||
.await?;
|
||||
|
||||
// Wait for web console response (see `mgmt`).
|
||||
info!("waiting for console's reply...");
|
||||
info!(parent: &span, "waiting for console's reply...");
|
||||
waiter.await?.map_err(LinkAuthError::AuthFailed)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -17,6 +17,7 @@ impl Endpoint {
|
||||
Self { endpoint, client }
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn url(&self) -> &ApiUrl {
|
||||
&self.endpoint
|
||||
}
|
||||
@@ -36,6 +37,16 @@ impl Endpoint {
|
||||
) -> Result<reqwest::Response, reqwest::Error> {
|
||||
self.client.execute(request).await
|
||||
}
|
||||
|
||||
/// Execute a [request](reqwest::Request) and raise an error if status != 200.
|
||||
pub async fn checked_execute(
|
||||
&self,
|
||||
request: reqwest::Request,
|
||||
) -> Result<reqwest::Response, reqwest::Error> {
|
||||
self.execute(request)
|
||||
.await
|
||||
.and_then(|r| r.error_for_status())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::auth;
|
||||
use crate::cancellation::{self, CancelMap};
|
||||
use crate::config::{ProxyConfig, TlsConfig};
|
||||
use crate::stream::{MetricsStream, PqStream, Stream};
|
||||
use crate::stream::{MeasuredStream, PqStream, Stream};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::TryFutureExt;
|
||||
use metrics::{register_int_counter, IntCounter};
|
||||
@@ -64,7 +64,7 @@ pub async fn task_main(
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
loop {
|
||||
let (socket, peer_addr) = listener.accept().await?;
|
||||
info!("accepted connection from {peer_addr}");
|
||||
info!("accepted postgres client connection from {peer_addr}");
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancel_map = Arc::clone(&cancel_map);
|
||||
@@ -270,8 +270,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let mut db = MetricsStream::new(db.stream, inc_proxied);
|
||||
let mut client = MetricsStream::new(stream.into_inner(), inc_proxied);
|
||||
let mut db = MeasuredStream::new(db.stream, inc_proxied);
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), inc_proxied);
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -231,7 +231,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
|
||||
pin_project! {
|
||||
/// This stream tracks all writes and calls user provided
|
||||
/// callback when the underlying stream is flushed.
|
||||
pub struct MetricsStream<S, W> {
|
||||
pub struct MeasuredStream<S, W> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
write_count: usize,
|
||||
@@ -239,7 +239,7 @@ pin_project! {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> MetricsStream<S, W> {
|
||||
impl<S, W> MeasuredStream<S, W> {
|
||||
pub fn new(stream: S, inc_write_count: W) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
@@ -249,7 +249,7 @@ impl<S, W> MetricsStream<S, W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin, W> AsyncRead for MetricsStream<S, W> {
|
||||
impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
@@ -259,7 +259,7 @@ impl<S: AsyncRead + Unpin, W> AsyncRead for MetricsStream<S, W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MetricsStream<S, W> {
|
||||
impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
|
||||
Reference in New Issue
Block a user