mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Measure cross-AZ traffic in safekeepers (#3806)
Create `safekeeper_pg_io_bytes_total` metric to track total amount of bytes written/read in a postgres connections to safekeepers. This metric has the following labels: - `client_az` – availability zone of the connection initiator, or `"unknown"` - `sk_az` – availability zone of the safekeeper, or `"unknown"` - `app_name` – `application_name` of the postgres client - `dir` – data direction, either `"read"` or `"write"` - `same_az` – `"true"`, `"false"` or `"unknown"`. Can be derived from `client_az` and `sk_az`, exists purely for convenience. This is implemented by passing availability zone in the connection string, like this: `-c tenant_id=AAA timeline_id=BBB availability-zone=AZ-1`. Update ansible deployment scripts to add availability_zone argument to safekeeper and pageserver in systemd service files.
This commit is contained in:
committed by
GitHub
parent
768c8d9972
commit
b067378d0d
18
.github/ansible/deploy.yaml
vendored
18
.github/ansible/deploy.yaml
vendored
@@ -91,6 +91,15 @@
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
# used in `pageserver.service` template
|
||||
- name: learn current availability_zone
|
||||
shell:
|
||||
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
|
||||
register: ec2_availability_zone
|
||||
|
||||
- set_fact:
|
||||
ec2_availability_zone={{ ec2_availability_zone.stdout }}
|
||||
|
||||
- name: upload systemd service definition
|
||||
ansible.builtin.template:
|
||||
src: systemd/pageserver.service
|
||||
@@ -153,6 +162,15 @@
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
# used in `safekeeper.service` template
|
||||
- name: learn current availability_zone
|
||||
shell:
|
||||
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
|
||||
register: ec2_availability_zone
|
||||
|
||||
- set_fact:
|
||||
ec2_availability_zone={{ ec2_availability_zone.stdout }}
|
||||
|
||||
# in the future safekeepers should discover pageservers byself
|
||||
# but currently use first pageserver that was discovered
|
||||
- name: set first pageserver var for safekeepers
|
||||
|
||||
2
.github/ansible/systemd/pageserver.service
vendored
2
.github/ansible/systemd/pageserver.service
vendored
@@ -6,7 +6,7 @@ After=network.target auditd.service
|
||||
Type=simple
|
||||
User=pageserver
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -D /storage/pageserver/data
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -c "availability_zone='{{ ec2_availability_zone }}'" -D /storage/pageserver/data
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
|
||||
2
.github/ansible/systemd/safekeeper.service
vendored
2
.github/ansible/systemd/safekeeper.service
vendored
@@ -6,7 +6,7 @@ After=network.target auditd.service
|
||||
Type=simple
|
||||
User=safekeeper
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' --availability-zone={{ ec2_availability_zone }}
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
|
||||
@@ -115,6 +115,10 @@ impl SafekeeperNode {
|
||||
let datadir = self.datadir_path();
|
||||
|
||||
let id_string = id.to_string();
|
||||
// TODO: add availability_zone to the config.
|
||||
// Right now we just specify any value here and use it to check metrics in tests.
|
||||
let availability_zone = format!("sk-{}", id_string);
|
||||
|
||||
let mut args = vec![
|
||||
"-D",
|
||||
datadir.to_str().with_context(|| {
|
||||
@@ -126,6 +130,8 @@ impl SafekeeperNode {
|
||||
&listen_pg,
|
||||
"--listen-http",
|
||||
&listen_http,
|
||||
"--availability-zone",
|
||||
&availability_zone,
|
||||
];
|
||||
if !self.conf.sync {
|
||||
args.push("--no-sync");
|
||||
|
||||
@@ -118,6 +118,9 @@ pub struct PageServerConf {
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
pub listen_http_addr: String,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
pub wait_lsn_timeout: Duration,
|
||||
// How long to wait for WAL redo to complete.
|
||||
@@ -202,6 +205,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
listen_http_addr: BuilderValue<String>,
|
||||
|
||||
availability_zone: BuilderValue<Option<String>>,
|
||||
|
||||
wait_lsn_timeout: BuilderValue<Duration>,
|
||||
wal_redo_timeout: BuilderValue<Duration>,
|
||||
|
||||
@@ -247,6 +252,7 @@ impl Default for PageServerConfigBuilder {
|
||||
Self {
|
||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
availability_zone: Set(None),
|
||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
@@ -303,6 +309,10 @@ impl PageServerConfigBuilder {
|
||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||
}
|
||||
|
||||
pub fn availability_zone(&mut self, availability_zone: Option<String>) {
|
||||
self.availability_zone = BuilderValue::Set(availability_zone)
|
||||
}
|
||||
|
||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||
}
|
||||
@@ -414,6 +424,9 @@ impl PageServerConfigBuilder {
|
||||
listen_http_addr: self
|
||||
.listen_http_addr
|
||||
.ok_or(anyhow!("missing listen_http_addr"))?,
|
||||
availability_zone: self
|
||||
.availability_zone
|
||||
.ok_or(anyhow!("missing availability_zone"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
|
||||
@@ -614,6 +627,7 @@ impl PageServerConf {
|
||||
match key {
|
||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||
"availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
|
||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
@@ -779,6 +793,7 @@ impl PageServerConf {
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
superuser: "cloud_admin".to_string(),
|
||||
workdir: repo_dir,
|
||||
pg_distrib_dir,
|
||||
@@ -961,6 +976,7 @@ log_format = 'json'
|
||||
id: NodeId(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
@@ -1019,6 +1035,7 @@ log_format = 'json'
|
||||
id: NodeId(10),
|
||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||
availability_zone: None,
|
||||
wait_lsn_timeout: Duration::from_secs(111),
|
||||
wal_redo_timeout: Duration::from_secs(111),
|
||||
superuser: "zzzz".to_string(),
|
||||
|
||||
@@ -1335,6 +1335,7 @@ impl Timeline {
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
self.conf.availability_zone.clone(),
|
||||
background_ctx,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ pub fn spawn_connection_manager_task(
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
ctx: RequestContext,
|
||||
) {
|
||||
let mut broker_client = get_broker_client().clone();
|
||||
@@ -67,6 +68,7 @@ pub fn spawn_connection_manager_task(
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
auth_token,
|
||||
availability_zone,
|
||||
);
|
||||
loop {
|
||||
select! {
|
||||
@@ -334,6 +336,7 @@ struct WalreceiverState {
|
||||
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
|
||||
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
}
|
||||
|
||||
/// Current connection data.
|
||||
@@ -381,6 +384,7 @@ impl WalreceiverState {
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
) -> Self {
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: timeline.tenant_id,
|
||||
@@ -396,6 +400,7 @@ impl WalreceiverState {
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
wal_connection_retries: HashMap::new(),
|
||||
auth_token,
|
||||
availability_zone,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -740,6 +745,7 @@ impl WalreceiverState {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
},
|
||||
self.availability_zone.as_deref(),
|
||||
) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
@@ -824,17 +830,24 @@ fn wal_stream_connection_config(
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
Ok(PgConnectionConfig::new_host_port(host, port)
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned())))
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1273,6 +1286,7 @@ mod tests {
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
wal_connection_retries: HashMap::new(),
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +71,9 @@ struct Args {
|
||||
/// Listen http endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
|
||||
listen_http: String,
|
||||
/// Availability zone of the safekeeper.
|
||||
#[arg(long)]
|
||||
availability_zone: Option<String>,
|
||||
/// Do not wait for changes to be written safely to disk. Unsafe.
|
||||
#[arg(short, long)]
|
||||
no_sync: bool,
|
||||
@@ -166,6 +169,7 @@ fn main() -> anyhow::Result<()> {
|
||||
my_id: id,
|
||||
listen_pg_addr: args.listen_pg,
|
||||
listen_http_addr: args.listen_http,
|
||||
availability_zone: args.availability_zone,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::{info, info_span, Instrument};
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use postgres_backend::QueryError;
|
||||
@@ -33,6 +34,7 @@ pub struct SafekeeperPostgresHandler {
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
claims: Option<Claims>,
|
||||
io_metrics: Option<TrafficMetrics>,
|
||||
}
|
||||
|
||||
/// Parsed Postgres command.
|
||||
@@ -94,6 +96,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
format!("Failed to parse {value} as timeline id")
|
||||
})?);
|
||||
}
|
||||
Some(("availability_zone", client_az)) => {
|
||||
if let Some(metrics) = self.io_metrics.as_ref() {
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
@@ -101,6 +108,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
self.appname = Some(app_name.to_owned());
|
||||
if let Some(metrics) = self.io_metrics.as_ref() {
|
||||
metrics.set_app_name(app_name)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -187,7 +197,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
}
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
pub fn new(conf: SafeKeeperConf, conn_id: u32) -> Self {
|
||||
pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option<TrafficMetrics>) -> Self {
|
||||
SafekeeperPostgresHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
@@ -196,6 +206,7 @@ impl SafekeeperPostgresHandler {
|
||||
ttid: TenantTimelineId::empty(),
|
||||
conn_id,
|
||||
claims: None,
|
||||
io_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ pub struct SafeKeeperConf {
|
||||
pub my_id: NodeId,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub availability_zone: Option<String>,
|
||||
pub no_sync: bool,
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
@@ -82,6 +83,7 @@ impl SafeKeeperConf {
|
||||
no_sync: false,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
remote_storage: None,
|
||||
my_id: NodeId(0),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
//! Global safekeeper mertics and per-timeline safekeeper metrics.
|
||||
|
||||
use std::time::{Instant, SystemTime};
|
||||
use std::{
|
||||
sync::{Arc, RwLock},
|
||||
time::{Instant, SystemTime},
|
||||
};
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
@@ -63,13 +67,132 @@ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
});
|
||||
pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_pg_io_bytes",
|
||||
"safekeeper_pg_io_bytes_total",
|
||||
"Bytes read from or written to any PostgreSQL connection",
|
||||
&["direction"]
|
||||
&["client_az", "sk_az", "app_name", "dir", "same_az"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_pg_io_bytes gauge")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
/// Labels for traffic metrics.
|
||||
#[derive(Clone)]
|
||||
struct ConnectionLabels {
|
||||
/// Availability zone of the connection origin.
|
||||
client_az: String,
|
||||
/// Availability zone of the current safekeeper.
|
||||
sk_az: String,
|
||||
/// Client application name.
|
||||
app_name: String,
|
||||
}
|
||||
|
||||
impl ConnectionLabels {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
client_az: LABEL_UNKNOWN.to_string(),
|
||||
sk_az: LABEL_UNKNOWN.to_string(),
|
||||
app_name: LABEL_UNKNOWN.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_metrics(
|
||||
&self,
|
||||
) -> (
|
||||
GenericCounter<metrics::core::AtomicU64>,
|
||||
GenericCounter<metrics::core::AtomicU64>,
|
||||
) {
|
||||
let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
|
||||
(LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
|
||||
(client_az, sk_az) => {
|
||||
if client_az == sk_az {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let read = PG_IO_BYTES.with_label_values(&[
|
||||
&self.client_az,
|
||||
&self.sk_az,
|
||||
&self.app_name,
|
||||
"read",
|
||||
same_az,
|
||||
]);
|
||||
let write = PG_IO_BYTES.with_label_values(&[
|
||||
&self.client_az,
|
||||
&self.sk_az,
|
||||
&self.app_name,
|
||||
"write",
|
||||
same_az,
|
||||
]);
|
||||
(read, write)
|
||||
}
|
||||
}
|
||||
|
||||
struct TrafficMetricsState {
|
||||
/// Labels for traffic metrics.
|
||||
labels: ConnectionLabels,
|
||||
/// Total bytes read from this connection.
|
||||
read: GenericCounter<metrics::core::AtomicU64>,
|
||||
/// Total bytes written to this connection.
|
||||
write: GenericCounter<metrics::core::AtomicU64>,
|
||||
}
|
||||
|
||||
/// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
|
||||
#[derive(Clone)]
|
||||
pub struct TrafficMetrics {
|
||||
state: Arc<RwLock<TrafficMetricsState>>,
|
||||
}
|
||||
|
||||
impl Default for TrafficMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TrafficMetrics {
|
||||
pub fn new() -> Self {
|
||||
let labels = ConnectionLabels::new();
|
||||
let (read, write) = labels.build_metrics();
|
||||
let state = TrafficMetricsState {
|
||||
labels,
|
||||
read,
|
||||
write,
|
||||
};
|
||||
Self {
|
||||
state: Arc::new(RwLock::new(state)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_client_az(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.client_az = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn set_sk_az(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.sk_az = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn set_app_name(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.app_name = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn observe_read(&self, cnt: usize) {
|
||||
self.state.read().unwrap().read.inc_by(cnt as u64)
|
||||
}
|
||||
|
||||
pub fn observe_write(&self, cnt: usize) {
|
||||
self.state.read().unwrap().write.inc_by(cnt as u64)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for WalStorage in a single timeline.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct WalStorageMetrics {
|
||||
|
||||
@@ -9,8 +9,9 @@ use tokio::net::TcpStream;
|
||||
use tracing::*;
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{handler::SafekeeperPostgresHandler, metrics::PG_IO_BYTES};
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
@@ -68,20 +69,21 @@ fn handle_socket(
|
||||
.build()?;
|
||||
let local = tokio::task::LocalSet::new();
|
||||
|
||||
let read_metrics = PG_IO_BYTES.with_label_values(&["read"]);
|
||||
let write_metrics = PG_IO_BYTES.with_label_values(&["write"]);
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
|
||||
// TODO: measure cross-az traffic
|
||||
let traffic_metrics = TrafficMetrics::new();
|
||||
if let Some(current_az) = conf.availability_zone.as_deref() {
|
||||
traffic_metrics.set_sk_az(current_az);
|
||||
}
|
||||
|
||||
let socket = MeasuredStream::new(
|
||||
socket,
|
||||
|cnt| {
|
||||
read_metrics.inc_by(cnt as u64);
|
||||
traffic_metrics.observe_read(cnt);
|
||||
},
|
||||
|cnt| {
|
||||
write_metrics.inc_by(cnt as u64);
|
||||
traffic_metrics.observe_write(cnt);
|
||||
},
|
||||
);
|
||||
|
||||
@@ -89,7 +91,8 @@ fn handle_socket(
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(conf, conn_id);
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
|
||||
@@ -3,6 +3,7 @@ import shutil
|
||||
import time
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
@@ -87,6 +88,7 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.pageserver_config_override = "availability_zone='test_ps_az'"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_1, _ = env.neon_cli.create_tenant()
|
||||
@@ -122,6 +124,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
ps_metrics = all_metrics[0]
|
||||
sk_metrics = all_metrics[1:]
|
||||
|
||||
# Find all metrics among all safekeepers, accepts the same arguments as query_all()
|
||||
def query_all_safekeepers(name, filter):
|
||||
return list(
|
||||
chain.from_iterable(
|
||||
map(
|
||||
lambda sk: sk.query_all(name, filter),
|
||||
sk_metrics,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
ttids = [
|
||||
{"tenant_id": str(tenant_1), "timeline_id": str(timeline_1)},
|
||||
{"tenant_id": str(tenant_2), "timeline_id": str(timeline_2)},
|
||||
@@ -162,6 +175,40 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
f"process_start_time_seconds (UTC): {datetime.fromtimestamp(metrics.query_one('process_start_time_seconds').value)}"
|
||||
)
|
||||
|
||||
for io_direction in ["read", "write"]:
|
||||
# Querying all metrics for number of bytes read/written by pageserver in another AZ
|
||||
io_metrics = query_all_safekeepers(
|
||||
"safekeeper_pg_io_bytes_total",
|
||||
{
|
||||
"app_name": "pageserver",
|
||||
"client_az": "test_ps_az",
|
||||
"dir": io_direction,
|
||||
"same_az": "false",
|
||||
},
|
||||
)
|
||||
total_bytes = sum(int(metric.value) for metric in io_metrics)
|
||||
log.info(f"Pageserver {io_direction} bytes from another AZ: {total_bytes}")
|
||||
# We expect some bytes to be read/written, to make sure metrics are working
|
||||
assert total_bytes > 0
|
||||
|
||||
# Test (a subset of) safekeeper global metrics
|
||||
for sk_m in sk_metrics:
|
||||
# Test that every safekeeper has read some bytes
|
||||
assert any(
|
||||
map(
|
||||
lambda x: x.value > 0,
|
||||
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "read"}),
|
||||
)
|
||||
), f"{sk_m.name} has not read bytes"
|
||||
|
||||
# Test that every safekeeper has written some bytes
|
||||
assert any(
|
||||
map(
|
||||
lambda x: x.value > 0,
|
||||
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "write"}),
|
||||
)
|
||||
), f"{sk_m.name} has not written bytes"
|
||||
|
||||
# Test (a subset of) pageserver global metrics
|
||||
for metric in PAGESERVER_GLOBAL_METRICS:
|
||||
ps_samples = ps_metrics.query_all(metric, {})
|
||||
|
||||
Reference in New Issue
Block a user