safekeeper: add initial metrics and HTTP handler (#699, #541)

* `wal_acceptor`: add HTTP handler, /metrics endpoint only, no authentication
* Two gauges are currently reported: `flush_lsn` and `commit_lsn`
* Add `DEFAULT_PG_LISTEN_PORT` and `DEFAULT_PG_LISTEN_PORT` consts for uniformity
This commit is contained in:
Egor Suvorov
2021-10-08 18:55:41 +03:00
committed by GitHub
parent b3b8f18f61
commit 403d9779d9
9 changed files with 125 additions and 17 deletions

2
Cargo.lock generated
View File

@@ -2339,6 +2339,7 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"const_format",
"crc32c",
"daemonize",
"fs2",
@@ -2358,6 +2359,7 @@ dependencies = [
"tokio-stream",
"walkdir",
"workspace_hack",
"zenith_metrics",
"zenith_utils",
]

View File

@@ -794,12 +794,17 @@ def read_pid(path: Path):
return int(path.read_text())
@dataclass
class WalAcceptorPort:
pg: int
http: int
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
wa_bin_path: Path
data_dir: Path
port: int
port: WalAcceptorPort
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
@@ -811,7 +816,8 @@ class WalAcceptor:
cmd = [str(self.wa_bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["-l", f"localhost:{self.port}"])
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
@@ -868,14 +874,14 @@ class WalAcceptor:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions
conn.autocommit = True
with conn.cursor() as cur:
request_json = json.dumps(request)
print(f"JSON_CTRL request on port {self.port}: {request_json}")
print(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
cur.execute("JSON_CTRL " + request_json)
all = cur.fetchall()
print(f"JSON_CTRL response: {all[0][0]}")
@@ -898,7 +904,10 @@ class WalAcceptorFactory:
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=self.port_distributor.get_port(),
port=WalAcceptorPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
@@ -922,7 +931,7 @@ class WalAcceptorFactory:
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
@zenfixture

View File

@@ -28,9 +28,11 @@ humantime = "2.1.0"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -3,18 +3,23 @@
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::env;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith wal_acceptor")
.about("Store WAL stream to local file system and push it to WAL receivers")
.arg(
@@ -25,11 +30,18 @@ fn main() -> Result<()> {
.help("Path to the WAL acceptor data directory"),
)
.arg(
Arg::with_name("listen")
Arg::with_name("listen-pg")
.short("l")
.long("listen")
.long("listen-pg")
.alias("listen") // for compatibility
.takes_value(true)
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
)
.arg(
Arg::with_name("listen-http")
.long("listen-http")
.takes_value(true)
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
)
.arg(
Arg::with_name("pageserver")
@@ -70,7 +82,8 @@ fn main() -> Result<()> {
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_addr: "localhost:5454".to_string(),
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
@@ -91,8 +104,12 @@ fn main() -> Result<()> {
conf.daemonize = true;
}
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.to_owned();
if let Some(addr) = arg_matches.value_of("listen-pg") {
conf.listen_pg_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("listen-http") {
conf.listen_http_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
@@ -114,6 +131,11 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let log_filename = conf.data_dir.join("wal_acceptor.log");
let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?;
let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
if conf.daemonize {
info!("daemonizing...");
@@ -136,6 +158,16 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
// No authentication at all: read-only metrics only, early stage.
let router = endpoint::make_router();
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()

View File

@@ -11,12 +11,23 @@ pub mod send_wal;
pub mod timeline;
pub mod wal_service;
pub mod defaults {
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
}
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,
pub listen_addr: String,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,

View File

@@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(

View File

@@ -15,8 +15,11 @@ use std::cmp::min;
use std::io;
use std::io::Read;
use lazy_static::lazy_static;
use crate::replication::HotStandbyFeedback;
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::SystemId;
@@ -279,6 +282,38 @@ pub trait Storage {
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
}
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_flush_lsn int gauge vec");
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_commit_lsn int gauge vec");
}
struct SafeKeeperMetrics {
flush_lsn: Gauge,
commit_lsn: Gauge,
}
impl SafeKeeperMetrics {
fn new(ztli: ZTimelineId) -> SafeKeeperMetrics {
let ztli_str = format!("{}", ztli);
SafeKeeperMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]),
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]),
}
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<ST: Storage> {
@@ -286,6 +321,8 @@ pub struct SafeKeeper<ST: Storage> {
/// Established by reading wal.
pub flush_lsn: Lsn,
pub tli: u32,
// Cached metrics so we don't have to recompute labels on each update.
metrics: Option<SafeKeeperMetrics>,
/// not-yet-flushed pairs of same named fields in s.*
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
@@ -304,6 +341,7 @@ where
SafeKeeper {
flush_lsn,
tli,
metrics: None,
commit_lsn: state.commit_lsn,
truncate_lsn: state.truncate_lsn,
storage,
@@ -355,6 +393,8 @@ where
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage.persist(&self.s, true)?;
self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli));
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
@@ -478,6 +518,11 @@ where
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics
.as_ref()
.unwrap()
.flush_lsn
.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
@@ -495,6 +540,11 @@ where
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.as_ref()
.unwrap()
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
}
self.truncate_lsn = msg.h.truncate_lsn;

View File

@@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
info!("Starting wal acceptor on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_addr, e);
info!("Starting wal acceptor on {}", conf.listen_pg_addr);
let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e
})?;

View File

@@ -5,6 +5,8 @@
use lazy_static::lazy_static;
use once_cell::race::OnceBox;
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};