Files
neon/proxy/src/context.rs
Conrad Ludgate 8a646cb750 proxy: add request context for observability and blocking (#6160)
## Summary of changes

### RequestMonitoring

We want to add an event stream with information on each request for
easier analysis than what we can do with diagnostic logs alone
(https://github.com/neondatabase/cloud/issues/8807). This
RequestMonitoring will keep a record of the final state of a request. On
drop it will be pushed into a queue to be uploaded.

Because this context is a bag of data, I don't want this information to
impact logic of request handling. I personally think that weakly typed
data (such as all these options) makes for spaghetti code. I will
however allow for this data to impact rate-limiting and blocking of
requests, as this does not _really_ change how a request is handled.

### Parquet

Each `RequestMonitoring` is flushed into a channel where it is converted
into `RequestData`, which is accumulated into parquet files. Each file
will have a certain number of rows per row group, and several row groups
will eventually fill up the file, which we then upload to S3.

We will also upload smaller files if they take too long to construct.
2024-01-08 11:42:43 +00:00

111 lines
3.0 KiB
Rust

//! Connection request monitoring contexts
use chrono::Utc;
use once_cell::sync::OnceCell;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::{console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer};
pub mod parquet;
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
#[derive(Clone)]
/// Context data for a single request to connect to a database.
///
/// This data should **not** be used for connection logic, only for observability and limiting purposes.
/// All connection logic should instead use strongly typed state machines, not a bunch of Options.
pub struct RequestMonitoring {
pub peer_addr: IpAddr,
pub session_id: Uuid,
pub protocol: &'static str,
first_packet: chrono::DateTime<Utc>,
region: &'static str,
// filled in as they are discovered
project: Option<SmolStr>,
branch: Option<SmolStr>,
endpoint_id: Option<SmolStr>,
user: Option<SmolStr>,
application: Option<SmolStr>,
error_kind: Option<ErrorKind>,
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
pub latency_timer: LatencyTimer,
}
impl RequestMonitoring {
pub fn new(
session_id: Uuid,
peer_addr: IpAddr,
protocol: &'static str,
region: &'static str,
) -> Self {
Self {
peer_addr,
session_id,
protocol,
first_packet: Utc::now(),
region,
project: None,
branch: None,
endpoint_id: None,
user: None,
application: None,
error_kind: None,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
latency_timer: LatencyTimer::new(protocol),
}
}
#[cfg(test)]
pub fn test() -> Self {
RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
}
pub fn console_application_name(&self) -> String {
format!(
"{}/{}",
self.application.as_deref().unwrap_or_default(),
self.protocol
)
}
pub fn set_project(&mut self, x: MetricsAuxInfo) {
self.branch = Some(x.branch_id);
self.endpoint_id = Some(x.endpoint_id);
self.project = Some(x.project_id);
}
pub fn set_endpoint_id(&mut self, endpoint_id: Option<SmolStr>) {
self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
}
pub fn set_application(&mut self, app: Option<SmolStr>) {
self.application = app.or_else(|| self.application.clone());
}
pub fn set_user(&mut self, user: SmolStr) {
self.user = Some(user);
}
pub fn log(&mut self) {
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(self.clone());
}
}
}
impl Drop for RequestMonitoring {
fn drop(&mut self) {
self.log()
}
}