mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Simplify MeasuredStream a little.
It was generalized so that you could pass a custom function that is called whenever data is flushed. The only use case we have for it was to increment a prometheus counter, so let's dismantle the abstraction, and just pass a prometheus IntCounter to it. Simplifies the code a little bit. If we need the abstraction again in the future, we can always add it back.
This commit is contained in:
@@ -311,16 +311,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
.await?;
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), |cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
});
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), m_sent);
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
|
||||
let mut db = MeasuredStream::new(db.stream, |cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
});
|
||||
let mut db = MeasuredStream::new(db.stream, m_recv);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
|
||||
@@ -228,27 +228,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// This stream tracks all writes and calls user provided
|
||||
/// callback when the underlying stream is flushed.
|
||||
pub struct MeasuredStream<S, W> {
|
||||
/// This stream tracks all writes, and whenever the stream is flushed,
|
||||
/// increments the user-provided counter by the number of bytes flushed.
|
||||
pub struct MeasuredStream<S> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
write_count: usize,
|
||||
inc_write_count: W,
|
||||
write_counter: prometheus::IntCounter,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> MeasuredStream<S, W> {
|
||||
pub fn new(stream: S, inc_write_count: W) -> Self {
|
||||
impl<S> MeasuredStream<S> {
|
||||
pub fn new(stream: S, write_counter: prometheus::IntCounter) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
write_count: 0,
|
||||
inc_write_count,
|
||||
write_counter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
|
||||
impl<S: AsyncRead + Unpin> AsyncRead for MeasuredStream<S> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
@@ -258,7 +258,7 @@ impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W> {
|
||||
impl<S: AsyncWrite + Unpin> AsyncWrite for MeasuredStream<S> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
@@ -279,7 +279,7 @@ impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W>
|
||||
let this = self.project();
|
||||
this.stream.poll_flush(context).map_ok(|()| {
|
||||
// Call the user provided callback and reset the write count.
|
||||
(this.inc_write_count)(*this.write_count);
|
||||
this.write_counter.inc_by(*this.write_count as u64);
|
||||
*this.write_count = 0;
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user