mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 10:00:38 +00:00
Add some prometheus metrics to pageserver
The metrics are served by an http endpoint, which is meant to be spawned in a new thread. In the future the endpoint will provide more APIs, but for the time being, we won't bother with proper routing.
This commit is contained in:
79
Cargo.lock
generated
79
Cargo.lock
generated
@@ -1,5 +1,7 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.4.7"
|
||||
@@ -1126,6 +1128,7 @@ dependencies = [
|
||||
"regex",
|
||||
"rocksdb",
|
||||
"rust-s3",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"slog",
|
||||
@@ -1140,6 +1143,7 @@ dependencies = [
|
||||
"toml",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
@@ -1230,24 +1234,6 @@ dependencies = [
|
||||
"tokio-postgres 0.7.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.1"
|
||||
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"hmac",
|
||||
"lazy_static",
|
||||
"md-5",
|
||||
"memchr",
|
||||
"rand",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.1"
|
||||
@@ -1267,13 +1253,21 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.1"
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.1"
|
||||
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
|
||||
"hmac",
|
||||
"lazy_static",
|
||||
"md-5",
|
||||
"memchr",
|
||||
"rand",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1287,6 +1281,16 @@ dependencies = [
|
||||
"postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.1"
|
||||
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres_ffi"
|
||||
version = "0.1.0"
|
||||
@@ -1334,6 +1338,27 @@ dependencies = [
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"memchr",
|
||||
"parking_lot",
|
||||
"protobuf",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "2.24.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db50e77ae196458ccd3dc58a31ea1a90b0698ab1b7928d89f644c25d72070267"
|
||||
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
@@ -2401,6 +2426,14 @@ dependencies = [
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zenith_metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"prometheus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zenith_utils"
|
||||
version = "0.1.0"
|
||||
@@ -2410,10 +2443,14 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"hex-literal",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"postgres",
|
||||
"rand",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"workspace_hack",
|
||||
"zenith_metrics",
|
||||
]
|
||||
|
||||
11
Cargo.toml
11
Cargo.toml
@@ -1,13 +1,14 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"pageserver",
|
||||
"walkeeper",
|
||||
"zenith",
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"zenith_utils",
|
||||
"proxy",
|
||||
"walkeeper",
|
||||
"workspace_hack",
|
||||
"proxy"
|
||||
"zenith",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
[profile.release]
|
||||
|
||||
25
monitoring/docker-compose.yml
Normal file
25
monitoring/docker-compose.yml
Normal file
@@ -0,0 +1,25 @@
|
||||
version: "3"
|
||||
services:
|
||||
|
||||
prometheus:
|
||||
container_name: prometheus
|
||||
image: prom/prometheus:latest
|
||||
volumes:
|
||||
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
|
||||
# ports:
|
||||
# - "9090:9090"
|
||||
# TODO: find a proper portable solution
|
||||
network_mode: "host"
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
volumes:
|
||||
- ./grafana.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
|
||||
environment:
|
||||
- GF_AUTH_ANONYMOUS_ENABLED=true
|
||||
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
|
||||
- GF_AUTH_DISABLE_LOGIN_FORM=true
|
||||
# ports:
|
||||
# - "3000:3000"
|
||||
# TODO: find a proper portable solution
|
||||
network_mode: "host"
|
||||
12
monitoring/grafana.yaml
Normal file
12
monitoring/grafana.yaml
Normal file
@@ -0,0 +1,12 @@
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
type: prometheus
|
||||
access: proxy
|
||||
orgId: 1
|
||||
url: http://localhost:9090
|
||||
basicAuth: false
|
||||
isDefault: false
|
||||
version: 1
|
||||
editable: false
|
||||
5
monitoring/prometheus.yaml
Normal file
5
monitoring/prometheus.yaml
Normal file
@@ -0,0 +1,5 @@
|
||||
scrape_configs:
|
||||
- job_name: 'default'
|
||||
scrape_interval: 10s
|
||||
static_configs:
|
||||
- targets: ['localhost:9898']
|
||||
@@ -23,8 +23,8 @@ log = "0.4.14"
|
||||
clap = "2.33.0"
|
||||
daemonize = "0.4.1"
|
||||
rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }
|
||||
tokio = { version = "1.3.0", features = ["full"] }
|
||||
tokio-stream = { version = "0.1.4" }
|
||||
tokio = { version = "1.5.0", features = ["full"] }
|
||||
tokio-stream = { version = "0.1.5" }
|
||||
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
@@ -41,7 +41,9 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
fs_extra = "1.2.0"
|
||||
toml = "0.5"
|
||||
scopeguard = "1.1.0"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
workspace_hack = { path = "../workspace_hack" }
|
||||
|
||||
@@ -18,8 +18,10 @@ use clap::{App, Arg, ArgMatches};
|
||||
use daemonize::Daemonize;
|
||||
|
||||
use pageserver::{branches, logger, page_cache, page_service, PageServerConf};
|
||||
use zenith_utils::http_endpoint;
|
||||
|
||||
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
|
||||
const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898";
|
||||
|
||||
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||
@@ -30,6 +32,7 @@ const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct CfgFileParams {
|
||||
listen_addr: Option<String>,
|
||||
http_endpoint_addr: Option<String>,
|
||||
gc_horizon: Option<String>,
|
||||
gc_period: Option<String>,
|
||||
pg_distrib_dir: Option<String>,
|
||||
@@ -44,6 +47,7 @@ impl CfgFileParams {
|
||||
|
||||
Self {
|
||||
listen_addr: get_arg("listen"),
|
||||
http_endpoint_addr: get_arg("http_endpoint"),
|
||||
gc_horizon: get_arg("gc_horizon"),
|
||||
gc_period: get_arg("gc_period"),
|
||||
pg_distrib_dir: get_arg("postgres-distrib"),
|
||||
@@ -55,6 +59,7 @@ impl CfgFileParams {
|
||||
// TODO cleaner way to do this
|
||||
Self {
|
||||
listen_addr: self.listen_addr.or(other.listen_addr),
|
||||
http_endpoint_addr: self.http_endpoint_addr.or(other.http_endpoint_addr),
|
||||
gc_horizon: self.gc_horizon.or(other.gc_horizon),
|
||||
gc_period: self.gc_period.or(other.gc_period),
|
||||
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
|
||||
@@ -68,6 +73,11 @@ impl CfgFileParams {
|
||||
None => DEFAULT_LISTEN_ADDR.to_owned(),
|
||||
};
|
||||
|
||||
let http_endpoint_addr = match self.http_endpoint_addr.as_ref() {
|
||||
Some(addr) => addr.clone(),
|
||||
None => DEFAULT_HTTP_ENDPOINT_ADDR.to_owned(),
|
||||
};
|
||||
|
||||
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
|
||||
Some(horizon_str) => horizon_str.parse()?,
|
||||
None => DEFAULT_GC_HORIZON,
|
||||
@@ -90,6 +100,7 @@ impl CfgFileParams {
|
||||
daemonize: false,
|
||||
|
||||
listen_addr,
|
||||
http_endpoint_addr,
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
|
||||
@@ -238,6 +249,11 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn a new thread for the http endpoint
|
||||
thread::Builder::new()
|
||||
.name("Metrics thread".into())
|
||||
.spawn(move || http_endpoint::thread_main(conf.http_endpoint_addr.clone()))?;
|
||||
|
||||
// Check that we can bind to address before starting threads to simplify shutdown
|
||||
// sequence if port is occupied.
|
||||
info!("Starting pageserver on {}", conf.listen_addr);
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use hex::FromHex;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use hex::FromHex;
|
||||
use lazy_static::lazy_static;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
|
||||
|
||||
pub mod basebackup;
|
||||
pub mod branches;
|
||||
pub mod logger;
|
||||
@@ -23,10 +25,20 @@ pub mod waldecoder;
|
||||
pub mod walreceiver;
|
||||
pub mod walredo;
|
||||
|
||||
lazy_static! {
|
||||
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"pageserver_live_connections_count",
|
||||
"Number of live network connections",
|
||||
&["pageserver_connection_kind"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PageServerConf {
|
||||
pub daemonize: bool,
|
||||
pub listen_addr: String,
|
||||
pub http_endpoint_addr: String,
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
pub superuser: String,
|
||||
|
||||
@@ -12,8 +12,10 @@
|
||||
|
||||
use anyhow::{anyhow, bail, ensure};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use zenith_metrics::{HistogramVec, register_histogram_vec};
|
||||
use std::io::Write;
|
||||
use std::net::TcpListener;
|
||||
use std::str::FromStr;
|
||||
@@ -153,6 +155,15 @@ pub fn thread_main(conf: &'static PageServerConf, listener: TcpListener) -> anyh
|
||||
}
|
||||
|
||||
fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> anyhow::Result<()> {
|
||||
// Immediately increment the gauge, then create a job to decrement it on thread exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
let mut conn_handler = PageServerHandler::new(conf);
|
||||
let mut pgbackend = PostgresBackend::new(socket, AuthType::Trust)?;
|
||||
pgbackend.run(&mut conn_handler)
|
||||
@@ -163,6 +174,24 @@ struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
}
|
||||
|
||||
const TIME_BUCKETS: &[f64] = &[
|
||||
0.00001, // 1/100000 s
|
||||
0.0001, 0.00015, 0.0002, 0.00025, 0.0003, 0.00035, 0.0005, 0.00075, // 1/10000 s
|
||||
0.001, 0.0025, 0.005, 0.0075, // 1/1000 s
|
||||
0.01, 0.0125, 0.015, 0.025, 0.05, // 1/100 s
|
||||
0.1, // 1/10 s
|
||||
];
|
||||
|
||||
lazy_static! {
|
||||
static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_smgr_query_time",
|
||||
"Time spent on smgr query handling",
|
||||
&["smgr_query_type"],
|
||||
TIME_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
pub fn new(conf: &'static PageServerConf) -> Self {
|
||||
PageServerHandler { conf }
|
||||
@@ -214,7 +243,11 @@ impl PageServerHandler {
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
|
||||
let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false);
|
||||
let exist = SMGR_QUERY_TIME
|
||||
.with_label_values(&["get_rel_exists"])
|
||||
.observe_closure_duration(|| {
|
||||
timeline.get_rel_exists(tag, req.lsn).unwrap_or(false)
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Status(PagestreamStatusResponse {
|
||||
ok: exist,
|
||||
@@ -230,7 +263,11 @@ impl PageServerHandler {
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
|
||||
let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0);
|
||||
let n_blocks = SMGR_QUERY_TIME
|
||||
.with_label_values(&["get_rel_size"])
|
||||
.observe_closure_duration(|| {
|
||||
timeline.get_rel_size(tag, req.lsn).unwrap_or(0)
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
|
||||
}
|
||||
@@ -243,22 +280,26 @@ impl PageServerHandler {
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
|
||||
let read_response = match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) {
|
||||
Ok(p) => PagestreamReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
page: p,
|
||||
},
|
||||
Err(e) => {
|
||||
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
||||
error!("get_page_at_lsn: {}", e);
|
||||
PagestreamReadResponse {
|
||||
ok: false,
|
||||
n_blocks: 0,
|
||||
page: Bytes::from_static(&ZERO_PAGE),
|
||||
let read_response = SMGR_QUERY_TIME
|
||||
.with_label_values(&["get_page_at_lsn"])
|
||||
.observe_closure_duration(|| {
|
||||
match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) {
|
||||
Ok(p) => PagestreamReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
page: p,
|
||||
},
|
||||
Err(e) => {
|
||||
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
||||
error!("get_page_at_lsn: {}", e);
|
||||
PagestreamReadResponse {
|
||||
ok: false,
|
||||
n_blocks: 0,
|
||||
page: Bytes::from_static(&ZERO_PAGE),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Read(read_response)
|
||||
}
|
||||
|
||||
@@ -295,6 +295,7 @@ mod tests {
|
||||
gc_horizon: 64 * 1024 * 1024,
|
||||
gc_period: Duration::from_secs(10),
|
||||
listen_addr: "127.0.0.1:5430".to_string(),
|
||||
http_endpoint_addr: "127.0.0.1:9898".to_string(),
|
||||
superuser: "zenith_admin".to_string(),
|
||||
workdir: repo_dir,
|
||||
pg_distrib_dir: "".into(),
|
||||
|
||||
@@ -131,6 +131,15 @@ fn walreceiver_main(
|
||||
let mut rclient = Client::connect(&connect_cfg, NoTls)?;
|
||||
info!("connected!");
|
||||
|
||||
// Immediately increment the gauge, then create a job to decrement it on thread exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
let identify = identify_system(&mut rclient)?;
|
||||
info!("{:?}", identify);
|
||||
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
|
||||
|
||||
@@ -7,7 +7,7 @@ use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zenith_utils::{
|
||||
postgres_backend::{self, query_from_cstring, PostgresBackend},
|
||||
postgres_backend::{self, query_from_cstring, AuthType, PostgresBackend},
|
||||
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
|
||||
};
|
||||
|
||||
@@ -34,7 +34,7 @@ pub fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow:
|
||||
|
||||
pub fn mgmt_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> {
|
||||
let mut conn_handler = MgmtHandler { state };
|
||||
let mut pgbackend = PostgresBackend::new(socket, postgres_backend::AuthType::Trust)?;
|
||||
let mut pgbackend = PostgresBackend::new(socket, AuthType::Trust)?;
|
||||
pgbackend.run(&mut conn_handler)
|
||||
}
|
||||
|
||||
|
||||
12
zenith_metrics/Cargo.toml
Normal file
12
zenith_metrics/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "zenith_metrics"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
prometheus = "0.12"
|
||||
|
||||
[dev-dependencies]
|
||||
lazy_static = "1.4.0"
|
||||
16
zenith_metrics/src/lib.rs
Normal file
16
zenith_metrics/src/lib.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
//! We re-export those from prometheus crate to
|
||||
//! make sure that we use the same dep version everywhere.
|
||||
//! Otherwise, we might not see all metrics registered via
|
||||
//! a default registry.
|
||||
pub use prometheus::gather;
|
||||
pub use prometheus::{Encoder, TextEncoder};
|
||||
pub use prometheus::{linear_buckets, exponential_buckets};
|
||||
pub use prometheus::{register_histogram, Histogram};
|
||||
pub use prometheus::{register_histogram_vec, HistogramVec};
|
||||
pub use prometheus::{register_int_counter, IntCounter};
|
||||
pub use prometheus::{register_int_counter_vec, IntCounterVec};
|
||||
pub use prometheus::{register_int_gauge, IntGauge};
|
||||
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
|
||||
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
211
zenith_metrics/src/wrappers.rs
Normal file
211
zenith_metrics/src/wrappers.rs
Normal file
@@ -0,0 +1,211 @@
|
||||
use std::io::{Result, Read, Write};
|
||||
|
||||
/// A wrapper for an object implementing [Read](std::io::Read)
|
||||
/// which allows a closure to observe the amount of bytes read.
|
||||
/// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
|
||||
///
|
||||
/// Example:
|
||||
///
|
||||
/// ```
|
||||
/// # use std::io::{Result, Read};
|
||||
/// # use zenith_metrics::{register_int_counter, IntCounter};
|
||||
/// # use zenith_metrics::CountedReader;
|
||||
/// #
|
||||
/// # lazy_static::lazy_static! {
|
||||
/// # static ref INT_COUNTER: IntCounter = register_int_counter!(
|
||||
/// # "int_counter",
|
||||
/// # "let's count something!"
|
||||
/// # ).unwrap();
|
||||
/// # }
|
||||
/// #
|
||||
/// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
|
||||
/// let mut reader = CountedReader::new(stream, |cnt| {
|
||||
/// // bump a counter each time we do a read
|
||||
/// INT_COUNTER.inc_by(cnt as u64);
|
||||
/// });
|
||||
///
|
||||
/// let mut proto_header = [0; 8];
|
||||
/// reader.read_exact(&mut proto_header)?;
|
||||
/// assert!(&proto_header == b"deadbeef");
|
||||
///
|
||||
/// let mut payload = vec![0; count];
|
||||
/// reader.read_exact(&mut payload)?;
|
||||
/// Ok(payload)
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// NB: rapid concurrent bumping of an atomic counter might incur
|
||||
/// a performance penalty. Please make sure to amortize the amount
|
||||
/// of atomic operations by either using [BufReader](std::io::BufReader)
|
||||
/// or choosing a non-atomic (thread local) counter.
|
||||
pub struct CountedReader<'a, T> {
|
||||
reader: T,
|
||||
update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
|
||||
}
|
||||
|
||||
impl<'a, T> CountedReader<'a, T> {
|
||||
pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
|
||||
Self {
|
||||
reader,
|
||||
update_counter: Box::new(update_counter),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an immutable reference to the underlying [Read](std::io::Read) implementor
|
||||
pub fn inner(&self) -> &T {
|
||||
&self.reader
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the underlying [Read](std::io::Read) implementor
|
||||
pub fn inner_mut(&mut self) -> &mut T {
|
||||
&mut self.reader
|
||||
}
|
||||
|
||||
/// Consume the wrapper and return the underlying [Read](std::io::Read) implementor
|
||||
pub fn into_inner(self) -> T {
|
||||
self.reader
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Read> Read for CountedReader<'_, T> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
let count = self.reader.read(buf)?;
|
||||
(self.update_counter)(count);
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper for an object implementing [Write](std::io::Write)
|
||||
/// which allows a closure to observe the amount of bytes written.
|
||||
/// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
|
||||
///
|
||||
/// Example:
|
||||
///
|
||||
/// ```
|
||||
/// # use std::io::{Result, Write};
|
||||
/// # use zenith_metrics::{register_int_counter, IntCounter};
|
||||
/// # use zenith_metrics::CountedWriter;
|
||||
/// #
|
||||
/// # lazy_static::lazy_static! {
|
||||
/// # static ref INT_COUNTER: IntCounter = register_int_counter!(
|
||||
/// # "int_counter",
|
||||
/// # "let's count something!"
|
||||
/// # ).unwrap();
|
||||
/// # }
|
||||
/// #
|
||||
/// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
|
||||
/// let mut writer = CountedWriter::new(stream, |cnt| {
|
||||
/// // bump a counter each time we do a write
|
||||
/// INT_COUNTER.inc_by(cnt as u64);
|
||||
/// });
|
||||
///
|
||||
/// let proto_header = b"deadbeef";
|
||||
/// writer.write_all(proto_header)?;
|
||||
/// writer.write_all(payload)
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// NB: rapid concurrent bumping of an atomic counter might incur
|
||||
/// a performance penalty. Please make sure to amortize the amount
|
||||
/// of atomic operations by either using [BufWriter](std::io::BufWriter)
|
||||
/// or choosing a non-atomic (thread local) counter.
|
||||
pub struct CountedWriter<'a, T> {
|
||||
writer: T,
|
||||
update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
|
||||
}
|
||||
|
||||
impl<'a, T> CountedWriter<'a, T> {
|
||||
pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
update_counter: Box::new(update_counter),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an immutable reference to the underlying [Write](std::io::Write) implementor
|
||||
pub fn inner(&self) -> &T {
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the underlying [Write](std::io::Write) implementor
|
||||
pub fn inner_mut(&mut self) -> &mut T {
|
||||
&mut self.writer
|
||||
}
|
||||
|
||||
/// Consume the wrapper and return the underlying [Write](std::io::Write) implementor
|
||||
pub fn into_inner(self) -> T {
|
||||
self.writer
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Write> Write for CountedWriter<'_, T> {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize> {
|
||||
let count = self.writer.write(buf)?;
|
||||
(self.update_counter)(count);
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<()> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_counted_reader() {
|
||||
let stream = [0; 16];
|
||||
let mut total = 0;
|
||||
let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
|
||||
total += cnt;
|
||||
});
|
||||
|
||||
let mut buffer = [0; 8];
|
||||
reader.read_exact(&mut buffer).unwrap();
|
||||
reader.read_exact(&mut buffer).unwrap();
|
||||
|
||||
drop(reader);
|
||||
assert_eq!(total, stream.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_counted_writer() {
|
||||
let mut stream = [0; 16];
|
||||
let mut total = 0;
|
||||
let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
|
||||
total += cnt;
|
||||
});
|
||||
|
||||
let buffer = [0; 8];
|
||||
writer.write_all(&buffer).unwrap();
|
||||
writer.write_all(&buffer).unwrap();
|
||||
|
||||
drop(writer);
|
||||
assert_eq!(total, stream.len());
|
||||
}
|
||||
|
||||
// This mimicks the constraints of std::thread::spawn
|
||||
fn assert_send_sync(_x: impl Sync + Send + 'static) {}
|
||||
|
||||
#[test]
|
||||
fn test_send_sync_counted_reader() {
|
||||
let stream: &[u8] = &[];
|
||||
let mut reader = CountedReader::new(stream, |_| {});
|
||||
|
||||
assert_send_sync(move || {
|
||||
reader.read_exact(&mut []).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_sync_counted_writer() {
|
||||
let stream = Vec::<u8>::new();
|
||||
let mut writer = CountedWriter::new(stream, |_| {});
|
||||
|
||||
assert_send_sync(move || {
|
||||
writer.write_all(&[]).unwrap();
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -6,13 +6,18 @@ edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
bytes = "1.0.1"
|
||||
byteorder = "1.4.3"
|
||||
log = "0.4.14"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
bincode = "1.3"
|
||||
thiserror = "1.0"
|
||||
byteorder = "1.4.3"
|
||||
bytes = "1.0.1"
|
||||
hyper = { version = "0.14.7", features = ["server"] }
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.14"
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1.5.0", features = ["full"] }
|
||||
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
workspace_hack = { path = "../workspace_hack" }
|
||||
rand = "0.8.3"
|
||||
|
||||
|
||||
53
zenith_utils/src/http_endpoint.rs
Normal file
53
zenith_utils/src/http_endpoint.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use hyper::{
|
||||
header::CONTENT_TYPE,
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Request, Response, Server,
|
||||
};
|
||||
use lazy_static::lazy_static;
|
||||
use zenith_metrics::{Encoder, TextEncoder};
|
||||
use zenith_metrics::{register_int_counter, IntCounter};
|
||||
|
||||
lazy_static! {
|
||||
static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!(
|
||||
"pageserver_serve_metrics_count",
|
||||
"Number of metric requests made"
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
async fn serve_prometheus_metrics(_req: Request<Body>) -> anyhow::Result<Response<Body>> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
let metrics = zenith_metrics::gather();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.unwrap();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn thread_main(addr: String) -> anyhow::Result<()> {
|
||||
let addr = addr.parse()?;
|
||||
log::info!("Starting a prometheus endoint at {}", addr);
|
||||
|
||||
// Enter a single-threaded tokio runtime bound to the current thread
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
let _guard = runtime.enter();
|
||||
|
||||
// TODO: use hyper_router/routerify/etc when we have more methods
|
||||
let server = Server::bind(&addr).serve(make_service_fn(|_| async {
|
||||
Ok::<_, anyhow::Error>(service_fn(serve_prometheus_metrics))
|
||||
}));
|
||||
|
||||
runtime.block_on(server)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -10,7 +10,7 @@ pub mod seqwait;
|
||||
// pub mod seqwait_async;
|
||||
|
||||
pub mod bin_ser;
|
||||
|
||||
pub mod http_endpoint;
|
||||
pub mod postgres_backend;
|
||||
pub mod pq_proto;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user