proxy: add more columns to parquet upload (#6405)

## Problem

Some fields were missed in the initial spec.

## Summary of changes

Adds a success boolean (defaults to false unless specifically marked as
successful).
Adds a duration_us integer that tracks how many microseconds were taken
from session start through to request completion.
This commit is contained in:
Conrad Ludgate
2024-01-20 09:38:11 +00:00
committed by GitHub
parent 760a48207d
commit 7e7e9f5191
7 changed files with 84 additions and 43 deletions

View File

@@ -89,3 +89,4 @@ camino-tempfile.workspace = true
rcgen.workspace = true
rstest.workspace = true
tokio-postgres-rustls.workspace = true
walkdir.workspace = true

View File

@@ -32,6 +32,7 @@ pub struct RequestMonitoring {
user: Option<SmolStr>,
application: Option<SmolStr>,
error_kind: Option<ErrorKind>,
success: bool,
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
@@ -59,6 +60,7 @@ impl RequestMonitoring {
user: None,
application: None,
error_kind: None,
success: false,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
latency_timer: LatencyTimer::new(protocol),
@@ -96,6 +98,10 @@ impl RequestMonitoring {
self.user = Some(user);
}
pub fn set_success(&mut self) {
self.success = true;
}
pub fn log(&mut self) {
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(self.clone());

View File

@@ -1,7 +1,8 @@
use std::sync::Arc;
use std::{sync::Arc, time::SystemTime};
use anyhow::Context;
use bytes::BytesMut;
use chrono::{Datelike, Timelike};
use futures::{Stream, StreamExt};
use parquet::{
basic::Compression,
@@ -86,6 +87,12 @@ struct RequestData {
project: Option<String>,
branch: Option<String>,
error: Option<&'static str>,
/// Success is counted if we form a HTTP response with sql rows inside
/// Or if we make it to proxy_pass
success: bool,
/// Tracks time from session start (HTTP request/libpq TCP handshake)
/// Through to success/failure
duration_us: u64,
}
impl From<RequestMonitoring> for RequestData {
@@ -102,6 +109,11 @@ impl From<RequestMonitoring> for RequestData {
protocol: value.protocol,
region: value.region,
error: value.error_kind.as_ref().map(|e| e.to_str()),
success: value.success,
duration_us: SystemTime::from(value.first_packet)
.elapsed()
.unwrap_or_default()
.as_micros() as u64, // 584 millenia... good enough
}
}
}
@@ -266,7 +278,13 @@ async fn upload_parquet(
let compression = len as f64 / len_uncompressed as f64;
let size = data.len();
let id = uuid::Uuid::now_v7();
let now = chrono::Utc::now();
let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
uuid::NoContext,
// we won't be running this in 1970. this cast is ok
now.timestamp() as u64,
now.timestamp_subsec_nanos(),
));
info!(
%id,
@@ -274,7 +292,14 @@ async fn upload_parquet(
size, compression, "uploading request parquet file"
);
let path = RemotePath::from_string(&format!("requests_{id}.parquet"))?;
let year = now.year();
let month = now.month();
let day = now.day();
let hour = now.hour();
// segment files by time for S3 performance
let path = RemotePath::from_string(&format!(
"{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
))?;
backoff::retry(
|| async {
let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
@@ -332,6 +357,7 @@ mod tests {
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
use tokio::{sync::mpsc, time};
use walkdir::WalkDir;
use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
@@ -420,6 +446,8 @@ mod tests {
protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
region: "us-east-1",
error: None,
success: rng.gen(),
duration_us: rng.gen_range(0..30_000_000),
}
}
@@ -442,9 +470,11 @@ mod tests {
worker_inner(storage, rx, config).await.unwrap();
let mut files = std::fs::read_dir(tmpdir.as_std_path())
.unwrap()
.map(|entry| entry.unwrap().path())
let mut files = WalkDir::new(tmpdir.as_std_path())
.into_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.map(|entry| entry.path().to_path_buf())
.collect_vec();
files.sort();
@@ -485,15 +515,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1029153, 3, 6000),
(1029075, 3, 6000),
(1029216, 3, 6000),
(1029129, 3, 6000),
(1029250, 3, 6000),
(1029017, 3, 6000),
(1029175, 3, 6000),
(1029247, 3, 6000),
(343124, 1, 2000)
(1087635, 3, 6000),
(1087288, 3, 6000),
(1087444, 3, 6000),
(1087572, 3, 6000),
(1087468, 3, 6000),
(1087500, 3, 6000),
(1087533, 3, 6000),
(1087566, 3, 6000),
(362671, 1, 2000)
],
);
@@ -523,11 +553,11 @@ mod tests {
assert_eq!(
file_stats,
[
(1166201, 6, 12000),
(1163577, 6, 12000),
(1164641, 6, 12000),
(1168772, 6, 12000),
(196761, 1, 2000)
(1028637, 5, 10000),
(1031969, 5, 10000),
(1019900, 5, 10000),
(1020365, 5, 10000),
(1025010, 5, 10000)
],
);
@@ -559,11 +589,11 @@ mod tests {
assert_eq!(
file_stats,
[
(1144934, 6, 12000),
(1144941, 6, 12000),
(1144735, 6, 12000),
(1144936, 6, 12000),
(191035, 1, 2000)
(1210770, 6, 12000),
(1211036, 6, 12000),
(1210990, 6, 12000),
(1210861, 6, 12000),
(202073, 1, 2000)
],
);
@@ -588,15 +618,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1029153, 3, 6000),
(1029075, 3, 6000),
(1029216, 3, 6000),
(1029129, 3, 6000),
(1029250, 3, 6000),
(1029017, 3, 6000),
(1029175, 3, 6000),
(1029247, 3, 6000),
(343124, 1, 2000)
(1087635, 3, 6000),
(1087288, 3, 6000),
(1087444, 3, 6000),
(1087572, 3, 6000),
(1087468, 3, 6000),
(1087500, 3, 6000),
(1087533, 3, 6000),
(1087566, 3, 6000),
(362671, 1, 2000)
],
);
@@ -633,7 +663,7 @@ mod tests {
// files are smaller than the size threshold, but they took too long to fill so were flushed early
assert_eq!(
file_stats,
[(515807, 2, 3001), (515585, 2, 3000), (515425, 2, 2999)],
[(545264, 2, 3001), (545025, 2, 3000), (544857, 2, 2999)],
);
tmpdir.close().unwrap();

View File

@@ -356,6 +356,7 @@ pub async fn proxy_pass(
compute: impl AsyncRead + AsyncWrite + Unpin,
aux: MetricsAuxInfo,
) -> anyhow::Result<()> {
ctx.set_success();
ctx.log();
let usage = USAGE_METRICS.register(Ids {

View File

@@ -26,7 +26,7 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use crate::{
auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list},
console,
console::{self, messages::MetricsAuxInfo},
context::RequestMonitoring,
metrics::NUM_DB_CONNECTIONS_GAUGE,
proxy::connect_compute::ConnectMechanism,
@@ -362,6 +362,7 @@ impl GlobalConnPool {
// ok return cached connection if found and establish a new one otherwise
let new_client = if let Some(client) = client {
ctx.set_project(client.aux.clone());
if client.inner.is_closed() {
let conn_id = uuid::Uuid::new_v4();
info!(%conn_id, "pool: cached connection '{conn_info}' is closed, opening a new one");
@@ -593,10 +594,6 @@ async fn connect_to_compute_once(
span.in_scope(|| {
info!(%conn_info, %session, "new connection");
});
let ids = Ids {
endpoint_id: node_info.aux.endpoint_id.clone(),
branch_id: node_info.aux.branch_id.clone(),
};
let db_user = conn_info.db_and_user();
tokio::spawn(
@@ -664,7 +661,7 @@ async fn connect_to_compute_once(
Ok(ClientInner {
inner: client,
session: tx,
ids,
aux: node_info.aux.clone(),
conn_id,
})
}
@@ -672,13 +669,17 @@ async fn connect_to_compute_once(
struct ClientInner {
inner: tokio_postgres::Client,
session: tokio::sync::watch::Sender<uuid::Uuid>,
ids: Ids,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id.clone(),
branch_id: aux.branch_id.clone(),
})
}
}

View File

@@ -497,6 +497,7 @@ async fn handle_inner(
}
};
ctx.set_success();
ctx.log();
let metrics = client.metrics();