From cb1b4a12a6e529543919cee1ae8ffdf8ca3c6112 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 3 Aug 2021 20:32:28 +0300 Subject: [PATCH] Add some prometheus metrics to pageserver The metrics are served by an http endpoint, which is meant to be spawned in a new thread. In the future the endpoint will provide more APIs, but for the time being, we won't bother with proper routing. --- Cargo.lock | 79 ++++++++--- Cargo.toml | 11 +- monitoring/docker-compose.yml | 25 ++++ monitoring/grafana.yaml | 12 ++ monitoring/prometheus.yaml | 5 + pageserver/Cargo.toml | 6 +- pageserver/src/bin/pageserver.rs | 16 +++ pageserver/src/lib.rs | 20 ++- pageserver/src/page_service.rs | 75 ++++++++--- pageserver/src/repository.rs | 1 + pageserver/src/walreceiver.rs | 9 ++ proxy/src/mgmt.rs | 4 +- zenith_metrics/Cargo.toml | 12 ++ zenith_metrics/src/lib.rs | 16 +++ zenith_metrics/src/wrappers.rs | 211 ++++++++++++++++++++++++++++++ zenith_utils/Cargo.toml | 15 ++- zenith_utils/src/http_endpoint.rs | 53 ++++++++ zenith_utils/src/lib.rs | 2 +- 18 files changed, 515 insertions(+), 57 deletions(-) create mode 100644 monitoring/docker-compose.yml create mode 100644 monitoring/grafana.yaml create mode 100644 monitoring/prometheus.yaml create mode 100644 zenith_metrics/Cargo.toml create mode 100644 zenith_metrics/src/lib.rs create mode 100644 zenith_metrics/src/wrappers.rs create mode 100644 zenith_utils/src/http_endpoint.rs diff --git a/Cargo.lock b/Cargo.lock index c8b69e0a7a..cfbeba8a3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "ahash" version = "0.4.7" @@ -1126,6 +1128,7 @@ dependencies = [ "regex", "rocksdb", "rust-s3", + "scopeguard", "serde", "serde_json", "slog", @@ -1140,6 +1143,7 @@ dependencies = [ "toml", "walkdir", "workspace_hack", + "zenith_metrics", "zenith_utils", ] @@ -1230,24 +1234,6 @@ dependencies = [ "tokio-postgres 0.7.1", ] -[[package]] -name = "postgres-protocol" -version = "0.6.1" -source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" -dependencies = [ - "base64", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "lazy_static", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - [[package]] name = "postgres-protocol" version = "0.6.1" @@ -1267,13 +1253,21 @@ dependencies = [ ] [[package]] -name = "postgres-types" -version = "0.2.1" +name = "postgres-protocol" +version = "0.6.1" source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" dependencies = [ + "base64", + "byteorder", "bytes", "fallible-iterator", - "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", + "hmac", + "lazy_static", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", ] [[package]] @@ -1287,6 +1281,16 @@ dependencies = [ "postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", +] + [[package]] name = "postgres_ffi" version = "0.1.0" @@ -1334,6 +1338,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prometheus" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db50e77ae196458ccd3dc58a31ea1a90b0698ab1b7928d89f644c25d72070267" + [[package]] name = "proxy" version = "0.1.0" @@ -2401,6 +2426,14 @@ dependencies = [ "zenith_utils", ] +[[package]] +name = "zenith_metrics" +version = "0.1.0" +dependencies = [ + "lazy_static", + "prometheus", +] + [[package]] name = "zenith_utils" version = "0.1.0" @@ -2410,10 +2443,14 @@ dependencies = [ "byteorder", "bytes", "hex-literal", + "hyper", + "lazy_static", "log", "postgres", "rand", "serde", "thiserror", + "tokio", "workspace_hack", + "zenith_metrics", ] diff --git a/Cargo.toml b/Cargo.toml index d317e9a27d..1ac8a9c0b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,14 @@ [workspace] members = [ - "pageserver", - "walkeeper", - "zenith", "control_plane", + "pageserver", "postgres_ffi", - "zenith_utils", + "proxy", + "walkeeper", "workspace_hack", - "proxy" + "zenith", + "zenith_metrics", + "zenith_utils", ] [profile.release] diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml new file mode 100644 index 0000000000..a3fda0b246 --- /dev/null +++ b/monitoring/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3" +services: + + prometheus: + container_name: prometheus + image: prom/prometheus:latest + volumes: + - ./prometheus.yaml:/etc/prometheus/prometheus.yml + # ports: + # - "9090:9090" + # TODO: find a proper portable solution + network_mode: "host" + + grafana: + image: grafana/grafana:latest + volumes: + - ./grafana.yaml:/etc/grafana/provisioning/datasources/datasources.yaml + environment: + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_AUTH_DISABLE_LOGIN_FORM=true + # ports: + # - "3000:3000" + # TODO: find a proper portable solution + network_mode: "host" diff --git a/monitoring/grafana.yaml b/monitoring/grafana.yaml new file mode 100644 index 0000000000..eac8879e6c --- /dev/null +++ b/monitoring/grafana.yaml @@ -0,0 +1,12 @@ +apiVersion: 1 + +datasources: +- name: Prometheus + type: prometheus + access: proxy + orgId: 1 + url: http://localhost:9090 + basicAuth: false + isDefault: false + version: 1 + editable: false diff --git a/monitoring/prometheus.yaml b/monitoring/prometheus.yaml new file mode 100644 index 0000000000..ba55d53737 --- /dev/null +++ b/monitoring/prometheus.yaml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: 'default' + scrape_interval: 10s + static_configs: + - targets: ['localhost:9898'] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index f83723cac9..a58c35756b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -23,8 +23,8 @@ log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] } -tokio = { version = "1.3.0", features = ["full"] } -tokio-stream = { version = "0.1.4" } +tokio = { version = "1.5.0", features = ["full"] } +tokio-stream = { version = "0.1.5" } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } @@ -41,7 +41,9 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" fs_extra = "1.2.0" toml = "0.5" +scopeguard = "1.1.0" postgres_ffi = { path = "../postgres_ffi" } +zenith_metrics = { path = "../zenith_metrics" } zenith_utils = { path = "../zenith_utils" } workspace_hack = { path = "../workspace_hack" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d31f7dc90d..f40dd00e55 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -18,8 +18,10 @@ use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; use pageserver::{branches, logger, page_cache, page_service, PageServerConf}; +use zenith_utils::http_endpoint; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; +const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898"; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); @@ -30,6 +32,7 @@ const DEFAULT_SUPERUSER: &str = "zenith_admin"; #[derive(Serialize, Deserialize)] struct CfgFileParams { listen_addr: Option, + http_endpoint_addr: Option, gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, @@ -44,6 +47,7 @@ impl CfgFileParams { Self { listen_addr: get_arg("listen"), + http_endpoint_addr: get_arg("http_endpoint"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), @@ -55,6 +59,7 @@ impl CfgFileParams { // TODO cleaner way to do this Self { listen_addr: self.listen_addr.or(other.listen_addr), + http_endpoint_addr: self.http_endpoint_addr.or(other.http_endpoint_addr), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), @@ -68,6 +73,11 @@ impl CfgFileParams { None => DEFAULT_LISTEN_ADDR.to_owned(), }; + let http_endpoint_addr = match self.http_endpoint_addr.as_ref() { + Some(addr) => addr.clone(), + None => DEFAULT_HTTP_ENDPOINT_ADDR.to_owned(), + }; + let gc_horizon: u64 = match self.gc_horizon.as_ref() { Some(horizon_str) => horizon_str.parse()?, None => DEFAULT_GC_HORIZON, @@ -90,6 +100,7 @@ impl CfgFileParams { daemonize: false, listen_addr, + http_endpoint_addr, gc_horizon, gc_period, @@ -238,6 +249,11 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { } } + // Spawn a new thread for the http endpoint + thread::Builder::new() + .name("Metrics thread".into()) + .spawn(move || http_endpoint::thread_main(conf.http_endpoint_addr.clone()))?; + // Check that we can bind to address before starting threads to simplify shutdown // sequence if port is occupied. info!("Starting pageserver on {}", conf.listen_addr); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 0f8be62b4b..172c3145da 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,12 +1,14 @@ -use hex::FromHex; -use rand::Rng; -use serde::{Deserialize, Serialize}; - use std::fmt; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use hex::FromHex; +use lazy_static::lazy_static; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; + pub mod basebackup; pub mod branches; pub mod logger; @@ -23,10 +25,20 @@ pub mod waldecoder; pub mod walreceiver; pub mod walredo; +lazy_static! { + static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( + "pageserver_live_connections_count", + "Number of live network connections", + &["pageserver_connection_kind"] + ) + .expect("failed to define a metric"); +} + #[derive(Debug, Clone)] pub struct PageServerConf { pub daemonize: bool, pub listen_addr: String, + pub http_endpoint_addr: String, pub gc_horizon: u64, pub gc_period: Duration, pub superuser: String, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 98fe7e6838..a12d188d3c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -12,8 +12,10 @@ use anyhow::{anyhow, bail, ensure}; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use lazy_static::lazy_static; use log::*; use regex::Regex; +use zenith_metrics::{HistogramVec, register_histogram_vec}; use std::io::Write; use std::net::TcpListener; use std::str::FromStr; @@ -153,6 +155,15 @@ pub fn thread_main(conf: &'static PageServerConf, listener: TcpListener) -> anyh } fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> anyhow::Result<()> { + // Immediately increment the gauge, then create a job to decrement it on thread exit. + // One of the pros of `defer!` is that this will *most probably* + // get called, even in presence of panics. + let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]); + gauge.inc(); + scopeguard::defer! { + gauge.dec(); + } + let mut conn_handler = PageServerHandler::new(conf); let mut pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; pgbackend.run(&mut conn_handler) @@ -163,6 +174,24 @@ struct PageServerHandler { conf: &'static PageServerConf, } +const TIME_BUCKETS: &[f64] = &[ + 0.00001, // 1/100000 s + 0.0001, 0.00015, 0.0002, 0.00025, 0.0003, 0.00035, 0.0005, 0.00075, // 1/10000 s + 0.001, 0.0025, 0.005, 0.0075, // 1/1000 s + 0.01, 0.0125, 0.015, 0.025, 0.05, // 1/100 s + 0.1, // 1/10 s +]; + +lazy_static! { + static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!( + "pageserver_smgr_query_time", + "Time spent on smgr query handling", + &["smgr_query_type"], + TIME_BUCKETS.into() + ) + .expect("failed to define a metric"); +} + impl PageServerHandler { pub fn new(conf: &'static PageServerConf) -> Self { PageServerHandler { conf } @@ -214,7 +243,11 @@ impl PageServerHandler { }; let tag = RelishTag::Relation(rel); - let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false); + let exist = SMGR_QUERY_TIME + .with_label_values(&["get_rel_exists"]) + .observe_closure_duration(|| { + timeline.get_rel_exists(tag, req.lsn).unwrap_or(false) + }); PagestreamBeMessage::Status(PagestreamStatusResponse { ok: exist, @@ -230,7 +263,11 @@ impl PageServerHandler { }; let tag = RelishTag::Relation(rel); - let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0); + let n_blocks = SMGR_QUERY_TIME + .with_label_values(&["get_rel_size"]) + .observe_closure_duration(|| { + timeline.get_rel_size(tag, req.lsn).unwrap_or(0) + }); PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } @@ -243,22 +280,26 @@ impl PageServerHandler { }; let tag = RelishTag::Relation(rel); - let read_response = match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) { - Ok(p) => PagestreamReadResponse { - ok: true, - n_blocks: 0, - page: p, - }, - Err(e) => { - const ZERO_PAGE: [u8; 8192] = [0; 8192]; - error!("get_page_at_lsn: {}", e); - PagestreamReadResponse { - ok: false, - n_blocks: 0, - page: Bytes::from_static(&ZERO_PAGE), + let read_response = SMGR_QUERY_TIME + .with_label_values(&["get_page_at_lsn"]) + .observe_closure_duration(|| { + match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) { + Ok(p) => PagestreamReadResponse { + ok: true, + n_blocks: 0, + page: p, + }, + Err(e) => { + const ZERO_PAGE: [u8; 8192] = [0; 8192]; + error!("get_page_at_lsn: {}", e); + PagestreamReadResponse { + ok: false, + n_blocks: 0, + page: Bytes::from_static(&ZERO_PAGE), + } + } } - } - }; + }); PagestreamBeMessage::Read(read_response) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 80dd2d5d8a..6360907a79 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -295,6 +295,7 @@ mod tests { gc_horizon: 64 * 1024 * 1024, gc_period: Duration::from_secs(10), listen_addr: "127.0.0.1:5430".to_string(), + http_endpoint_addr: "127.0.0.1:9898".to_string(), superuser: "zenith_admin".to_string(), workdir: repo_dir, pg_distrib_dir: "".into(), diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index f3f612bc2f..4da99d0918 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -131,6 +131,15 @@ fn walreceiver_main( let mut rclient = Client::connect(&connect_cfg, NoTls)?; info!("connected!"); + // Immediately increment the gauge, then create a job to decrement it on thread exit. + // One of the pros of `defer!` is that this will *most probably* + // get called, even in presence of panics. + let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]); + gauge.inc(); + scopeguard::defer! { + gauge.dec(); + } + let identify = identify_system(&mut rclient)?; info!("{:?}", identify); let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index d82712da9d..fab8d2121c 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -7,7 +7,7 @@ use anyhow::bail; use bytes::Bytes; use serde::{Deserialize, Serialize}; use zenith_utils::{ - postgres_backend::{self, query_from_cstring, PostgresBackend}, + postgres_backend::{self, query_from_cstring, AuthType, PostgresBackend}, pq_proto::{BeMessage, SINGLE_COL_ROWDESC}, }; @@ -34,7 +34,7 @@ pub fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow: pub fn mgmt_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> { let mut conn_handler = MgmtHandler { state }; - let mut pgbackend = PostgresBackend::new(socket, postgres_backend::AuthType::Trust)?; + let mut pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; pgbackend.run(&mut conn_handler) } diff --git a/zenith_metrics/Cargo.toml b/zenith_metrics/Cargo.toml new file mode 100644 index 0000000000..bf605dd7c7 --- /dev/null +++ b/zenith_metrics/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "zenith_metrics" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +prometheus = "0.12" + +[dev-dependencies] +lazy_static = "1.4.0" diff --git a/zenith_metrics/src/lib.rs b/zenith_metrics/src/lib.rs new file mode 100644 index 0000000000..0300010362 --- /dev/null +++ b/zenith_metrics/src/lib.rs @@ -0,0 +1,16 @@ +//! We re-export those from prometheus crate to +//! make sure that we use the same dep version everywhere. +//! Otherwise, we might not see all metrics registered via +//! a default registry. +pub use prometheus::gather; +pub use prometheus::{Encoder, TextEncoder}; +pub use prometheus::{linear_buckets, exponential_buckets}; +pub use prometheus::{register_histogram, Histogram}; +pub use prometheus::{register_histogram_vec, HistogramVec}; +pub use prometheus::{register_int_counter, IntCounter}; +pub use prometheus::{register_int_counter_vec, IntCounterVec}; +pub use prometheus::{register_int_gauge, IntGauge}; +pub use prometheus::{register_int_gauge_vec, IntGaugeVec}; + +mod wrappers; +pub use wrappers::{CountedReader, CountedWriter}; diff --git a/zenith_metrics/src/wrappers.rs b/zenith_metrics/src/wrappers.rs new file mode 100644 index 0000000000..bdadd5c209 --- /dev/null +++ b/zenith_metrics/src/wrappers.rs @@ -0,0 +1,211 @@ +use std::io::{Result, Read, Write}; + +/// A wrapper for an object implementing [Read](std::io::Read) +/// which allows a closure to observe the amount of bytes read. +/// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)). +/// +/// Example: +/// +/// ``` +/// # use std::io::{Result, Read}; +/// # use zenith_metrics::{register_int_counter, IntCounter}; +/// # use zenith_metrics::CountedReader; +/// # +/// # lazy_static::lazy_static! { +/// # static ref INT_COUNTER: IntCounter = register_int_counter!( +/// # "int_counter", +/// # "let's count something!" +/// # ).unwrap(); +/// # } +/// # +/// fn do_some_reads(stream: impl Read, count: usize) -> Result> { +/// let mut reader = CountedReader::new(stream, |cnt| { +/// // bump a counter each time we do a read +/// INT_COUNTER.inc_by(cnt as u64); +/// }); +/// +/// let mut proto_header = [0; 8]; +/// reader.read_exact(&mut proto_header)?; +/// assert!(&proto_header == b"deadbeef"); +/// +/// let mut payload = vec![0; count]; +/// reader.read_exact(&mut payload)?; +/// Ok(payload) +/// } +/// ``` +/// +/// NB: rapid concurrent bumping of an atomic counter might incur +/// a performance penalty. Please make sure to amortize the amount +/// of atomic operations by either using [BufReader](std::io::BufReader) +/// or choosing a non-atomic (thread local) counter. +pub struct CountedReader<'a, T> { + reader: T, + update_counter: Box, +} + +impl<'a, T> CountedReader<'a, T> { + pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self { + Self { + reader, + update_counter: Box::new(update_counter), + } + } + + /// Get an immutable reference to the underlying [Read](std::io::Read) implementor + pub fn inner(&self) -> &T { + &self.reader + } + + /// Get a mutable reference to the underlying [Read](std::io::Read) implementor + pub fn inner_mut(&mut self) -> &mut T { + &mut self.reader + } + + /// Consume the wrapper and return the underlying [Read](std::io::Read) implementor + pub fn into_inner(self) -> T { + self.reader + } +} + +impl Read for CountedReader<'_, T> { + fn read(&mut self, buf: &mut [u8]) -> Result { + let count = self.reader.read(buf)?; + (self.update_counter)(count); + Ok(count) + } +} + +/// A wrapper for an object implementing [Write](std::io::Write) +/// which allows a closure to observe the amount of bytes written. +/// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)). +/// +/// Example: +/// +/// ``` +/// # use std::io::{Result, Write}; +/// # use zenith_metrics::{register_int_counter, IntCounter}; +/// # use zenith_metrics::CountedWriter; +/// # +/// # lazy_static::lazy_static! { +/// # static ref INT_COUNTER: IntCounter = register_int_counter!( +/// # "int_counter", +/// # "let's count something!" +/// # ).unwrap(); +/// # } +/// # +/// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> { +/// let mut writer = CountedWriter::new(stream, |cnt| { +/// // bump a counter each time we do a write +/// INT_COUNTER.inc_by(cnt as u64); +/// }); +/// +/// let proto_header = b"deadbeef"; +/// writer.write_all(proto_header)?; +/// writer.write_all(payload) +/// } +/// ``` +/// +/// NB: rapid concurrent bumping of an atomic counter might incur +/// a performance penalty. Please make sure to amortize the amount +/// of atomic operations by either using [BufWriter](std::io::BufWriter) +/// or choosing a non-atomic (thread local) counter. +pub struct CountedWriter<'a, T> { + writer: T, + update_counter: Box, +} + +impl<'a, T> CountedWriter<'a, T> { + pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self { + Self { + writer, + update_counter: Box::new(update_counter), + } + } + + /// Get an immutable reference to the underlying [Write](std::io::Write) implementor + pub fn inner(&self) -> &T { + &self.writer + } + + /// Get a mutable reference to the underlying [Write](std::io::Write) implementor + pub fn inner_mut(&mut self) -> &mut T { + &mut self.writer + } + + /// Consume the wrapper and return the underlying [Write](std::io::Write) implementor + pub fn into_inner(self) -> T { + self.writer + } +} + +impl Write for CountedWriter<'_, T> { + fn write(&mut self, buf: &[u8]) -> Result { + let count = self.writer.write(buf)?; + (self.update_counter)(count); + Ok(count) + } + + fn flush(&mut self) -> Result<()> { + self.writer.flush() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_counted_reader() { + let stream = [0; 16]; + let mut total = 0; + let mut reader = CountedReader::new(stream.as_ref(), |cnt| { + total += cnt; + }); + + let mut buffer = [0; 8]; + reader.read_exact(&mut buffer).unwrap(); + reader.read_exact(&mut buffer).unwrap(); + + drop(reader); + assert_eq!(total, stream.len()); + } + + #[test] + fn test_counted_writer() { + let mut stream = [0; 16]; + let mut total = 0; + let mut writer = CountedWriter::new(stream.as_mut(), |cnt| { + total += cnt; + }); + + let buffer = [0; 8]; + writer.write_all(&buffer).unwrap(); + writer.write_all(&buffer).unwrap(); + + drop(writer); + assert_eq!(total, stream.len()); + } + + // This mimicks the constraints of std::thread::spawn + fn assert_send_sync(_x: impl Sync + Send + 'static) {} + + #[test] + fn test_send_sync_counted_reader() { + let stream: &[u8] = &[]; + let mut reader = CountedReader::new(stream, |_| {}); + + assert_send_sync(move || { + reader.read_exact(&mut []).unwrap(); + }); + } + + #[test] + fn test_send_sync_counted_writer() { + let stream = Vec::::new(); + let mut writer = CountedWriter::new(stream, |_| {}); + + assert_send_sync(move || { + writer.write_all(&[]).unwrap(); + }); + } +} diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 3fe2e8b8df..4ec7fc4ac9 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -6,13 +6,18 @@ edition = "2018" [dependencies] anyhow = "1.0" -bytes = "1.0.1" -byteorder = "1.4.3" -log = "0.4.14" -serde = { version = "1.0", features = ["derive"] } bincode = "1.3" -thiserror = "1.0" +byteorder = "1.4.3" +bytes = "1.0.1" +hyper = { version = "0.14.7", features = ["server"] } +lazy_static = "1.4.0" +log = "0.4.14" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +serde = { version = "1.0", features = ["derive"] } +thiserror = "1.0" +tokio = { version = "1.5.0", features = ["full"] } + +zenith_metrics = { path = "../zenith_metrics" } workspace_hack = { path = "../workspace_hack" } rand = "0.8.3" diff --git a/zenith_utils/src/http_endpoint.rs b/zenith_utils/src/http_endpoint.rs new file mode 100644 index 0000000000..5d0f5d6283 --- /dev/null +++ b/zenith_utils/src/http_endpoint.rs @@ -0,0 +1,53 @@ +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; +use lazy_static::lazy_static; +use zenith_metrics::{Encoder, TextEncoder}; +use zenith_metrics::{register_int_counter, IntCounter}; + +lazy_static! { + static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!( + "pageserver_serve_metrics_count", + "Number of metric requests made" + ) + .expect("failed to define a metric"); +} + +async fn serve_prometheus_metrics(_req: Request) -> anyhow::Result> { + SERVE_METRICS_COUNT.inc(); + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metrics = zenith_metrics::gather(); + encoder.encode(&metrics, &mut buffer).unwrap(); + + let response = Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap(); + + Ok(response) +} + +pub fn thread_main(addr: String) -> anyhow::Result<()> { + let addr = addr.parse()?; + log::info!("Starting a prometheus endoint at {}", addr); + + // Enter a single-threaded tokio runtime bound to the current thread + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + let _guard = runtime.enter(); + + // TODO: use hyper_router/routerify/etc when we have more methods + let server = Server::bind(&addr).serve(make_service_fn(|_| async { + Ok::<_, anyhow::Error>(service_fn(serve_prometheus_metrics)) + })); + + runtime.block_on(server)?; + + Ok(()) +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 5d867b7dce..ca7cac91c4 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -10,7 +10,7 @@ pub mod seqwait; // pub mod seqwait_async; pub mod bin_ser; - +pub mod http_endpoint; pub mod postgres_backend; pub mod pq_proto;