mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
proxy: Create disconnect events (#7535)
## Problem It's not possible to get the duration of the session from proxy events. ## Summary of changes * Added a separate events folder in s3, to record disconnect events. * Disconnect events are exactly the same as normal events, but also have `disconnect_timestamp` field not empty. * @oruen suggested to fill it with the same information as the original events to avoid potentially heavy joins.
This commit is contained in:
@@ -279,7 +279,7 @@ async fn handle_client(
|
||||
|
||||
// doesn't yet matter as pg-sni-router doesn't report analytics logs
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
|
||||
@@ -20,7 +20,8 @@ use self::parquet::RequestData;
|
||||
|
||||
pub mod parquet;
|
||||
|
||||
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
|
||||
/// Context data for a single request to connect to a database.
|
||||
///
|
||||
@@ -49,9 +50,12 @@ pub struct RequestMonitoring {
|
||||
// extra
|
||||
// This sender is here to keep the request monitoring channel open while requests are taking place.
|
||||
sender: Option<mpsc::UnboundedSender<RequestData>>,
|
||||
// This sender is only used to log the length of session in case of success.
|
||||
disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
|
||||
pub latency_timer: LatencyTimer,
|
||||
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
|
||||
rejected: Option<bool>,
|
||||
disconnect_timestamp: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -100,7 +104,9 @@ impl RequestMonitoring {
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
|
||||
latency_timer: LatencyTimer::new(protocol),
|
||||
disconnect_timestamp: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,11 +196,7 @@ impl RequestMonitoring {
|
||||
self.success = true;
|
||||
}
|
||||
|
||||
pub fn log(self) {}
|
||||
}
|
||||
|
||||
impl Drop for RequestMonitoring {
|
||||
fn drop(&mut self) {
|
||||
pub fn log_connect(&mut self) {
|
||||
let outcome = if self.success {
|
||||
ConnectOutcome::Success
|
||||
} else {
|
||||
@@ -226,4 +228,23 @@ impl Drop for RequestMonitoring {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
}
|
||||
}
|
||||
|
||||
fn log_disconnect(&mut self) {
|
||||
// If we are here, it's guaranteed that the user successfully connected to the endpoint.
|
||||
// Here we log the length of the session.
|
||||
self.disconnect_timestamp = Some(Utc::now());
|
||||
if let Some(tx) = self.disconnect_sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RequestMonitoring {
|
||||
fn drop(&mut self) {
|
||||
if self.sender.is_some() {
|
||||
self.log_connect();
|
||||
} else {
|
||||
self.log_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,10 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, Span};
|
||||
use utils::backoff;
|
||||
|
||||
use crate::config::{remote_storage_from_toml, OptRemoteStorageConfig};
|
||||
use crate::{
|
||||
config::{remote_storage_from_toml, OptRemoteStorageConfig},
|
||||
context::LOG_CHAN_DISCONNECT,
|
||||
};
|
||||
|
||||
use super::{RequestMonitoring, LOG_CHAN};
|
||||
|
||||
@@ -31,6 +34,9 @@ pub struct ParquetUploadArgs {
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_remote_storage: OptRemoteStorageConfig,
|
||||
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
|
||||
|
||||
/// How many rows to include in a row group
|
||||
#[clap(long, default_value_t = 8192)]
|
||||
parquet_upload_row_group_size: usize,
|
||||
@@ -91,6 +97,8 @@ pub struct RequestData {
|
||||
/// Tracks time from session start (HTTP request/libpq TCP handshake)
|
||||
/// Through to success/failure
|
||||
duration_us: u64,
|
||||
/// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
|
||||
disconnect_timestamp: Option<chrono::NaiveDateTime>,
|
||||
}
|
||||
|
||||
impl From<&RequestMonitoring> for RequestData {
|
||||
@@ -120,6 +128,7 @@ impl From<&RequestMonitoring> for RequestData {
|
||||
.elapsed()
|
||||
.unwrap_or_default()
|
||||
.as_micros() as u64, // 584 millenia... good enough
|
||||
disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,8 +150,9 @@ pub async fn worker(
|
||||
LOG_CHAN.set(tx.downgrade()).unwrap();
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
let cancellation_token2 = cancellation_token.clone();
|
||||
tokio::spawn(async move {
|
||||
cancellation_token.cancelled().await;
|
||||
cancellation_token2.cancelled().await;
|
||||
// dropping this sender will cause the channel to close only once
|
||||
// all the remaining inflight requests have been completed.
|
||||
drop(tx);
|
||||
@@ -167,9 +177,38 @@ pub async fn worker(
|
||||
test_remote_failures: 0,
|
||||
};
|
||||
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
// TODO(anna): consider moving this to a separate function.
|
||||
if let Some(disconnect_events_storage_config) =
|
||||
config.parquet_upload_disconnect_events_remote_storage
|
||||
{
|
||||
let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
|
||||
LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
tokio::spawn(async move {
|
||||
cancellation_token.cancelled().await;
|
||||
// dropping this sender will cause the channel to close only once
|
||||
// all the remaining inflight requests have been completed.
|
||||
drop(tx_disconnect);
|
||||
});
|
||||
let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
|
||||
let rx_disconnect = rx_disconnect.map(RequestData::from);
|
||||
|
||||
let storage_disconnect =
|
||||
GenericRemoteStorage::from_config(&disconnect_events_storage_config)
|
||||
.context("remote storage for disconnect events init")?;
|
||||
let parquet_config_disconnect = parquet_config.clone();
|
||||
tokio::try_join!(
|
||||
worker_inner(storage, rx, parquet_config),
|
||||
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
|
||||
)
|
||||
.map(|_| ())
|
||||
} else {
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ParquetConfig {
|
||||
propeties: WriterPropertiesPtr,
|
||||
rows_per_group: usize,
|
||||
@@ -452,6 +491,7 @@ mod tests {
|
||||
success: rng.gen(),
|
||||
cold_start_info: "no",
|
||||
duration_us: rng.gen_range(0..30_000_000),
|
||||
disconnect_timestamp: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -520,15 +560,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1314385, 3, 6000),
|
||||
(1314378, 3, 6000),
|
||||
(1314438, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(1314525, 3, 6000),
|
||||
(1314367, 3, 6000),
|
||||
(1314159, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(438352, 1, 2000)
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -558,11 +598,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1220633, 5, 10000),
|
||||
(1226783, 5, 10000),
|
||||
(1228577, 5, 10000),
|
||||
(1227939, 5, 10000),
|
||||
(1219217, 5, 10000)
|
||||
(1221738, 5, 10000),
|
||||
(1227888, 5, 10000),
|
||||
(1229682, 5, 10000),
|
||||
(1229044, 5, 10000),
|
||||
(1220322, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -594,11 +634,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1206280, 5, 10000),
|
||||
(1206011, 5, 10000),
|
||||
(1206304, 5, 10000),
|
||||
(1206292, 5, 10000),
|
||||
(1206547, 5, 10000)
|
||||
(1207385, 5, 10000),
|
||||
(1207116, 5, 10000),
|
||||
(1207409, 5, 10000),
|
||||
(1207397, 5, 10000),
|
||||
(1207652, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -623,15 +663,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1314385, 3, 6000),
|
||||
(1314378, 3, 6000),
|
||||
(1314438, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(1314525, 3, 6000),
|
||||
(1314367, 3, 6000),
|
||||
(1314159, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(438352, 1, 2000)
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -668,7 +708,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(658823, 2, 3001), (658537, 2, 3000), (658333, 2, 2999)]
|
||||
[(659240, 2, 3001), (658954, 2, 3000), (658750, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -132,16 +132,14 @@ pub async fn task_main(
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
ctx.log();
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
|
||||
@@ -156,17 +156,15 @@ pub async fn serve_websocket(
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
ctx.log();
|
||||
Err(e.into())
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
Ok(())
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
p.proxy_pass().await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user