proxy: Add new metrics (#1132)

This commit is contained in:
bojanserafimov
2022-01-14 19:12:43 -05:00
committed by GitHub
parent 17b7caddcb
commit 8af1b43074

View File

@@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::{io, thread};
use tokio_postgres::NoTls;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_utils::postgres_backend::{self, PostgresBackend, ProtoState, Stream};
use zenith_utils::pq_proto::{BeMessage as Be, FeMessage as Fe, *};
use zenith_utils::sock_split::{ReadStream, WriteStream};
@@ -33,6 +34,24 @@ impl CancelClosure {
lazy_static! {
// Enables serving CancelRequests
static ref CANCEL_MAP: Mutex<HashMap<CancelKeyData, CancelClosure>> = Mutex::new(HashMap::new());
// Metrics
static ref NUM_CONNECTIONS_ACCEPTED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_accepted"),
"Number of TCP client connections accepted."
).unwrap();
static ref NUM_CONNECTIONS_CLOSED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_closed"),
"Number of TCP client connections closed."
).unwrap();
static ref NUM_CONNECTIONS_FAILED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_failed"),
"Number of TCP client connections that closed due to error."
).unwrap();
static ref NUM_BYTES_PROXIED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_bytes_proxied"),
"Number of bytes sent/received between any client and backend."
).unwrap();
}
thread_local! {
@@ -52,6 +71,7 @@ pub fn thread_main(
loop {
let (socket, peer_addr) = listener.accept()?;
println!("accepted connection from {}", peer_addr);
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
socket.set_nodelay(true).unwrap();
// TODO Use a threadpool instead. Maybe use tokio's threadpool by
@@ -61,10 +81,12 @@ pub fn thread_main(
.name("Proxy thread".into())
.spawn(move || {
if let Err(err) = proxy_conn_main(state, socket) {
NUM_CONNECTIONS_FAILED_COUNTER.inc();
println!("error: {}", err);
}
// Clean up CANCEL_MAP.
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
THREAD_CANCEL_KEY_DATA.with(|cell| {
if let Some(cancel_key_data) = cell.get() {
CANCEL_MAP.lock().remove(&cancel_key_data);
@@ -339,6 +361,7 @@ fn proxy(
// so we can afford to lose `res` in case `flush` fails
let res = self.0.write(buf);
if res.is_ok() {
NUM_BYTES_PROXIED_COUNTER.inc_by(buf.len() as u64);
self.flush()?;
}
res