Files
neon/proxy/src/context.rs
Conrad Ludgate ec8dcc2231 flatten proxy flow (#6447)
## Problem

Taking my ideas from https://github.com/neondatabase/neon/pull/6283 and
doing a bit less radical changes. smaller commits.

Proxy flow was quite deeply nested, which makes adding more interesting
error handling quite tricky.

## Summary of changes

I recommend reviewing commit by commit.

1. move handshake logic into a separate file
2. move passthrough logic into a separate file
3. no longer accept a closure in CancelMap session logic
4. Remove connect_to_db, copy logic into handle_client
5. flatten auth_and_wake_compute in authenticate
6. record info for link auth
2024-01-29 17:38:03 +00:00

123 lines
3.3 KiB
Rust

//! Connection request monitoring contexts
use chrono::Utc;
use once_cell::sync::OnceCell;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::{
console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer, BranchId,
EndpointId, ProjectId, RoleName,
};
pub mod parquet;
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
#[derive(Clone)]
/// Context data for a single request to connect to a database.
///
/// This data should **not** be used for connection logic, only for observability and limiting purposes.
/// All connection logic should instead use strongly typed state machines, not a bunch of Options.
pub struct RequestMonitoring {
pub peer_addr: IpAddr,
pub session_id: Uuid,
pub protocol: &'static str,
first_packet: chrono::DateTime<Utc>,
region: &'static str,
// filled in as they are discovered
project: Option<ProjectId>,
branch: Option<BranchId>,
endpoint_id: Option<EndpointId>,
user: Option<RoleName>,
application: Option<SmolStr>,
error_kind: Option<ErrorKind>,
success: bool,
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
pub latency_timer: LatencyTimer,
}
impl RequestMonitoring {
pub fn new(
session_id: Uuid,
peer_addr: IpAddr,
protocol: &'static str,
region: &'static str,
) -> Self {
Self {
peer_addr,
session_id,
protocol,
first_packet: Utc::now(),
region,
project: None,
branch: None,
endpoint_id: None,
user: None,
application: None,
error_kind: None,
success: false,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
latency_timer: LatencyTimer::new(protocol),
}
}
#[cfg(test)]
pub fn test() -> Self {
RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
}
pub fn console_application_name(&self) -> String {
format!(
"{}/{}",
self.application.as_deref().unwrap_or_default(),
self.protocol
)
}
pub fn set_project(&mut self, x: MetricsAuxInfo) {
self.branch = Some(x.branch_id);
self.endpoint_id = Some(x.endpoint_id);
self.project = Some(x.project_id);
}
pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
crate::metrics::CONNECTING_ENDPOINTS
.with_label_values(&[self.protocol])
.measure(&endpoint_id);
self.endpoint_id = Some(endpoint_id);
}
pub fn set_application(&mut self, app: Option<SmolStr>) {
self.application = app.or_else(|| self.application.clone());
}
pub fn set_user(&mut self, user: RoleName) {
self.user = Some(user);
}
pub fn set_success(&mut self) {
self.success = true;
}
pub fn log(&mut self) {
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(self.clone());
}
}
}
impl Drop for RequestMonitoring {
fn drop(&mut self) {
self.log()
}
}