diff --git a/Cargo.lock b/Cargo.lock index c217dfbebb..e36b462d8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2339,6 +2339,7 @@ dependencies = [ "byteorder", "bytes", "clap", + "const_format", "crc32c", "daemonize", "fs2", @@ -2358,6 +2359,7 @@ dependencies = [ "tokio-stream", "walkdir", "workspace_hack", + "zenith_metrics", "zenith_utils", ] diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index d29d278cdd..c0ce57801d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -794,12 +794,17 @@ def read_pid(path: Path): return int(path.read_text()) +@dataclass +class WalAcceptorPort: + pg: int + http: int + @dataclass class WalAcceptor: """ An object representing a running wal acceptor daemon. """ wa_bin_path: Path data_dir: Path - port: int + port: WalAcceptorPort num: int # identifier for logging pageserver_port: int auth_token: Optional[str] = None @@ -811,7 +816,8 @@ class WalAcceptor: cmd = [str(self.wa_bin_path)] cmd.extend(["-D", str(self.data_dir)]) - cmd.extend(["-l", f"localhost:{self.port}"]) + cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"]) + cmd.extend(["--listen-http", f"localhost:{self.port.http}"]) cmd.append("--daemonize") cmd.append("--no-sync") # Tell page server it can receive WAL from this WAL safekeeper @@ -868,14 +874,14 @@ class WalAcceptor: # "replication=0" hacks psycopg not to send additional queries # on startup, see https://github.com/psycopg/psycopg2/pull/482 - connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" + connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" with closing(psycopg2.connect(connstr)) as conn: # server doesn't support transactions conn.autocommit = True with conn.cursor() as cur: request_json = json.dumps(request) - print(f"JSON_CTRL request on port {self.port}: {request_json}") + print(f"JSON_CTRL request on port {self.port.pg}: {request_json}") cur.execute("JSON_CTRL " + request_json) all = cur.fetchall() print(f"JSON_CTRL response: {all[0][0]}") @@ -898,7 +904,10 @@ class WalAcceptorFactory: wa = WalAcceptor( wa_bin_path=self.wa_bin_path, data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num), - port=self.port_distributor.get_port(), + port=WalAcceptorPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ), num=wa_num, pageserver_port=self.pageserver_port, auth_token=auth_token, @@ -922,7 +931,7 @@ class WalAcceptorFactory: def get_connstrs(self) -> str: """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ - return ','.join(["localhost:{}".format(wa.port) for wa in self.instances]) + return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances]) @zenfixture diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 16790ca214..f31126eb05 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -28,9 +28,11 @@ humantime = "2.1.0" walkdir = "2" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" +const_format = "0.2.21" # FIXME: 'pageserver' is needed for ZTimelineId. Refactor pageserver = { path = "../pageserver" } postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { path = "../workspace_hack" } +zenith_metrics = { path = "../zenith_metrics" } zenith_utils = { path = "../zenith_utils" } diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index d8a0ab6737..5bd64cb8a6 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -3,18 +3,23 @@ // use anyhow::Result; use clap::{App, Arg}; +use const_format::formatcp; use daemonize::Daemonize; use log::*; use std::env; +use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::thread; +use zenith_utils::http::endpoint; use zenith_utils::logging; +use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::WalAcceptorConf; fn main() -> Result<()> { + zenith_metrics::set_common_metrics_prefix("safekeeper"); let arg_matches = App::new("Zenith wal_acceptor") .about("Store WAL stream to local file system and push it to WAL receivers") .arg( @@ -25,11 +30,18 @@ fn main() -> Result<()> { .help("Path to the WAL acceptor data directory"), ) .arg( - Arg::with_name("listen") + Arg::with_name("listen-pg") .short("l") - .long("listen") + .long("listen-pg") + .alias("listen") // for compatibility .takes_value(true) - .help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"), + .help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")), + ) + .arg( + Arg::with_name("listen-http") + .long("listen-http") + .takes_value(true) + .help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")), ) .arg( Arg::with_name("pageserver") @@ -70,7 +82,8 @@ fn main() -> Result<()> { daemonize: false, no_sync: false, pageserver_addr: None, - listen_addr: "localhost:5454".to_string(), + listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(), + listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(), ttl: None, recall_period: None, pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), @@ -91,8 +104,12 @@ fn main() -> Result<()> { conf.daemonize = true; } - if let Some(addr) = arg_matches.value_of("listen") { - conf.listen_addr = addr.to_owned(); + if let Some(addr) = arg_matches.value_of("listen-pg") { + conf.listen_pg_addr = addr.to_owned(); + } + + if let Some(addr) = arg_matches.value_of("listen-http") { + conf.listen_http_addr = addr.to_owned(); } if let Some(addr) = arg_matches.value_of("pageserver") { @@ -114,6 +131,11 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> { let log_filename = conf.data_dir.join("wal_acceptor.log"); let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?; + let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_http_addr, e); + e + })?; + if conf.daemonize { info!("daemonizing..."); @@ -136,6 +158,16 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> { let mut threads = Vec::new(); + let http_endpoint_thread = thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(move || { + // No authentication at all: read-only metrics only, early stage. + let router = endpoint::make_router(); + endpoint::serve_thread_main(router, http_listener).unwrap(); + }) + .unwrap(); + threads.push(http_endpoint_thread); + if conf.ttl.is_some() { let s3_conf = conf.clone(); let s3_offload_thread = thread::Builder::new() diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index fb04459c47..6c1f70efa2 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -11,12 +11,23 @@ pub mod send_wal; pub mod timeline; pub mod wal_service; +pub mod defaults { + use const_format::formatcp; + + pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; + pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); + + pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676; + pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); +} + #[derive(Debug, Clone)] pub struct WalAcceptorConf { pub data_dir: PathBuf, pub daemonize: bool, pub no_sync: bool, - pub listen_addr: String, + pub listen_pg_addr: String, + pub listen_http_addr: String, pub pageserver_addr: Option, // TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework pub pageserver_auth_token: Option, diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 4596344b76..527c8d891c 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT ); // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses - let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr); + let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr); let me_conf: Config = me_connstr.parse().unwrap(); let (host, port) = connection_host_port(&me_conf); let callme = format!( diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 8cb629774e..111cbbaf95 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -15,8 +15,11 @@ use std::cmp::min; use std::io; use std::io::Read; +use lazy_static::lazy_static; + use crate::replication::HotStandbyFeedback; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; +use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::pq_proto::SystemId; @@ -279,6 +282,38 @@ pub trait Storage { fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; } +lazy_static! { + // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). + // i64 is faster than f64, so update to u64 when available. + static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( + "safekeeper_flush_lsn", + "Current flush_lsn, grouped by timeline", + &["ztli"] + ) + .expect("Failed to register safekeeper_flush_lsn int gauge vec"); + static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!( + "safekeeper_commit_lsn", + "Current commit_lsn (not necessarily persisted to disk), grouped by timeline", + &["ztli"] + ) + .expect("Failed to register safekeeper_commit_lsn int gauge vec"); +} + +struct SafeKeeperMetrics { + flush_lsn: Gauge, + commit_lsn: Gauge, +} + +impl SafeKeeperMetrics { + fn new(ztli: ZTimelineId) -> SafeKeeperMetrics { + let ztli_str = format!("{}", ztli); + SafeKeeperMetrics { + flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]), + commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]), + } + } +} + /// SafeKeeper which consumes events (messages from compute) and provides /// replies. pub struct SafeKeeper { @@ -286,6 +321,8 @@ pub struct SafeKeeper { /// Established by reading wal. pub flush_lsn: Lsn, pub tli: u32, + // Cached metrics so we don't have to recompute labels on each update. + metrics: Option, /// not-yet-flushed pairs of same named fields in s.* pub commit_lsn: Lsn, pub truncate_lsn: Lsn, @@ -304,6 +341,7 @@ where SafeKeeper { flush_lsn, tli, + metrics: None, commit_lsn: state.commit_lsn, truncate_lsn: state.truncate_lsn, storage, @@ -355,6 +393,8 @@ where self.s.server.wal_seg_size = msg.wal_seg_size; self.storage.persist(&self.s, true)?; + self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli)); + info!( "processed greeting from proposer {:?}, sending term {:?}", msg.proposer_id, self.s.acceptor_state.term @@ -478,6 +518,11 @@ where } if last_rec_lsn > self.flush_lsn { self.flush_lsn = last_rec_lsn; + self.metrics + .as_ref() + .unwrap() + .flush_lsn + .set(u64::from(self.flush_lsn) as f64); } // Advance commit_lsn taking into account what we have locally. xxx this @@ -495,6 +540,11 @@ where sync_control_file |= commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; self.commit_lsn = commit_lsn; + self.metrics + .as_ref() + .unwrap() + .commit_lsn + .set(u64::from(self.commit_lsn) as f64); } self.truncate_lsn = msg.h.truncate_lsn; diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index c77078560c..7730a058ed 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { - info!("Starting wal acceptor on {}", conf.listen_addr); - let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| { - error!("failed to bind to address {}: {}", conf.listen_addr, e); + info!("Starting wal acceptor on {}", conf.listen_pg_addr); + let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); e })?; diff --git a/zenith_metrics/src/lib.rs b/zenith_metrics/src/lib.rs index e3c3c81ee7..b01e80d873 100644 --- a/zenith_metrics/src/lib.rs +++ b/zenith_metrics/src/lib.rs @@ -5,6 +5,8 @@ use lazy_static::lazy_static; use once_cell::race::OnceBox; pub use prometheus::{exponential_buckets, linear_buckets}; +pub use prometheus::{register_gauge, Gauge}; +pub use prometheus::{register_gauge_vec, GaugeVec}; pub use prometheus::{register_histogram, Histogram}; pub use prometheus::{register_histogram_vec, HistogramVec}; pub use prometheus::{register_int_counter, IntCounter};