From b067378d0d4f2e5ba39fd4ddf8bcedb6ef75ce98 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 16 Mar 2023 16:24:01 +0200 Subject: [PATCH] Measure cross-AZ traffic in safekeepers (#3806) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create `safekeeper_pg_io_bytes_total` metric to track total amount of bytes written/read in a postgres connections to safekeepers. This metric has the following labels: - `client_az` – availability zone of the connection initiator, or `"unknown"` - `sk_az` – availability zone of the safekeeper, or `"unknown"` - `app_name` – `application_name` of the postgres client - `dir` – data direction, either `"read"` or `"write"` - `same_az` – `"true"`, `"false"` or `"unknown"`. Can be derived from `client_az` and `sk_az`, exists purely for convenience. This is implemented by passing availability zone in the connection string, like this: `-c tenant_id=AAA timeline_id=BBB availability-zone=AZ-1`. Update ansible deployment scripts to add availability_zone argument to safekeeper and pageserver in systemd service files. --- .github/ansible/deploy.yaml | 18 +++ .github/ansible/systemd/pageserver.service | 2 +- .github/ansible/systemd/safekeeper.service | 2 +- control_plane/src/safekeeper.rs | 6 + pageserver/src/config.rs | 17 +++ pageserver/src/tenant/timeline.rs | 1 + .../walreceiver/connection_manager.rs | 18 ++- safekeeper/src/bin/safekeeper.rs | 4 + safekeeper/src/handler.rs | 13 +- safekeeper/src/lib.rs | 2 + safekeeper/src/metrics.rs | 131 +++++++++++++++++- safekeeper/src/wal_service.rs | 19 +-- test_runner/regress/test_tenants.py | 47 +++++++ 13 files changed, 263 insertions(+), 17 deletions(-) diff --git a/.github/ansible/deploy.yaml b/.github/ansible/deploy.yaml index 0243e91f37..d4c1dec8ea 100644 --- a/.github/ansible/deploy.yaml +++ b/.github/ansible/deploy.yaml @@ -91,6 +91,15 @@ tags: - pageserver + # used in `pageserver.service` template + - name: learn current availability_zone + shell: + cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone" + register: ec2_availability_zone + + - set_fact: + ec2_availability_zone={{ ec2_availability_zone.stdout }} + - name: upload systemd service definition ansible.builtin.template: src: systemd/pageserver.service @@ -153,6 +162,15 @@ tags: - safekeeper + # used in `safekeeper.service` template + - name: learn current availability_zone + shell: + cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone" + register: ec2_availability_zone + + - set_fact: + ec2_availability_zone={{ ec2_availability_zone.stdout }} + # in the future safekeepers should discover pageservers byself # but currently use first pageserver that was discovered - name: set first pageserver var for safekeepers diff --git a/.github/ansible/systemd/pageserver.service b/.github/ansible/systemd/pageserver.service index 4570a666fa..177fc9f9d2 100644 --- a/.github/ansible/systemd/pageserver.service +++ b/.github/ansible/systemd/pageserver.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=pageserver Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }} SENTRY_ENVIRONMENT={{ sentry_environment }} -ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -D /storage/pageserver/data +ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -c "availability_zone='{{ ec2_availability_zone }}'" -D /storage/pageserver/data ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/.github/ansible/systemd/safekeeper.service b/.github/ansible/systemd/safekeeper.service index d7d8d26b1a..a8ef4bc0f5 100644 --- a/.github/ansible/systemd/safekeeper.service +++ b/.github/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }} SENTRY_ENVIRONMENT={{ sentry_environment }} -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' --availability-zone={{ ec2_availability_zone }} ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 4c0812a5e3..228968be80 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -115,6 +115,10 @@ impl SafekeeperNode { let datadir = self.datadir_path(); let id_string = id.to_string(); + // TODO: add availability_zone to the config. + // Right now we just specify any value here and use it to check metrics in tests. + let availability_zone = format!("sk-{}", id_string); + let mut args = vec![ "-D", datadir.to_str().with_context(|| { @@ -126,6 +130,8 @@ impl SafekeeperNode { &listen_pg, "--listen-http", &listen_http, + "--availability-zone", + &availability_zone, ]; if !self.conf.sync { args.push("--no-sync"); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index d17f0bc143..16f35355af 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -118,6 +118,9 @@ pub struct PageServerConf { /// Example (default): 127.0.0.1:9898 pub listen_http_addr: String, + /// Current availability zone. Used for traffic metrics. + pub availability_zone: Option, + // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. pub wait_lsn_timeout: Duration, // How long to wait for WAL redo to complete. @@ -202,6 +205,8 @@ struct PageServerConfigBuilder { listen_http_addr: BuilderValue, + availability_zone: BuilderValue>, + wait_lsn_timeout: BuilderValue, wal_redo_timeout: BuilderValue, @@ -247,6 +252,7 @@ impl Default for PageServerConfigBuilder { Self { listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), + availability_zone: Set(None), wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) .expect("cannot parse default wait lsn timeout")), wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) @@ -303,6 +309,10 @@ impl PageServerConfigBuilder { self.listen_http_addr = BuilderValue::Set(listen_http_addr) } + pub fn availability_zone(&mut self, availability_zone: Option) { + self.availability_zone = BuilderValue::Set(availability_zone) + } + pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) { self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) } @@ -414,6 +424,9 @@ impl PageServerConfigBuilder { listen_http_addr: self .listen_http_addr .ok_or(anyhow!("missing listen_http_addr"))?, + availability_zone: self + .availability_zone + .ok_or(anyhow!("missing availability_zone"))?, wait_lsn_timeout: self .wait_lsn_timeout .ok_or(anyhow!("missing wait_lsn_timeout"))?, @@ -614,6 +627,7 @@ impl PageServerConf { match key { "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), + "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), @@ -779,6 +793,7 @@ impl PageServerConf { max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), + availability_zone: None, superuser: "cloud_admin".to_string(), workdir: repo_dir, pg_distrib_dir, @@ -961,6 +976,7 @@ log_format = 'json' id: NodeId(10), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), + availability_zone: None, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?, superuser: defaults::DEFAULT_SUPERUSER.to_string(), @@ -1019,6 +1035,7 @@ log_format = 'json' id: NodeId(10), listen_pg_addr: "127.0.0.1:64000".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), + availability_zone: None, wait_lsn_timeout: Duration::from_secs(111), wal_redo_timeout: Duration::from_secs(111), superuser: "zzzz".to_string(), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fff288c683..0c42ed3079 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1335,6 +1335,7 @@ impl Timeline { lagging_wal_timeout, max_lsn_wal_lag, crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), + self.conf.availability_zone.clone(), background_ctx, ); } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 64a79b6d1b..0c770136db 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -45,6 +45,7 @@ pub fn spawn_connection_manager_task( lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, auth_token: Option>, + availability_zone: Option, ctx: RequestContext, ) { let mut broker_client = get_broker_client().clone(); @@ -67,6 +68,7 @@ pub fn spawn_connection_manager_task( lagging_wal_timeout, max_lsn_wal_lag, auth_token, + availability_zone, ); loop { select! { @@ -334,6 +336,7 @@ struct WalreceiverState { /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id. wal_stream_candidates: HashMap, auth_token: Option>, + availability_zone: Option, } /// Current connection data. @@ -381,6 +384,7 @@ impl WalreceiverState { lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, auth_token: Option>, + availability_zone: Option, ) -> Self { let id = TenantTimelineId { tenant_id: timeline.tenant_id, @@ -396,6 +400,7 @@ impl WalreceiverState { wal_stream_candidates: HashMap::new(), wal_connection_retries: HashMap::new(), auth_token, + availability_zone, } } @@ -740,6 +745,7 @@ impl WalreceiverState { None => None, Some(x) => Some(x), }, + self.availability_zone.as_deref(), ) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { @@ -824,17 +830,24 @@ fn wal_stream_connection_config( }: TenantTimelineId, listen_pg_addr_str: &str, auth_token: Option<&str>, + availability_zone: Option<&str>, ) -> anyhow::Result { let (host, port) = parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; let port = port.unwrap_or(5432); - Ok(PgConnectionConfig::new_host_port(host, port) + let mut connstr = PgConnectionConfig::new_host_port(host, port) .extend_options([ "-c".to_owned(), format!("timeline_id={}", timeline_id), format!("tenant_id={}", tenant_id), ]) - .set_password(auth_token.map(|s| s.to_owned()))) + .set_password(auth_token.map(|s| s.to_owned())); + + if let Some(availability_zone) = availability_zone { + connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); + } + + Ok(connstr) } #[cfg(test)] @@ -1273,6 +1286,7 @@ mod tests { wal_stream_candidates: HashMap::new(), wal_connection_retries: HashMap::new(), auth_token: None, + availability_zone: None, } } } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index d2cb9f79b9..848b955af8 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -71,6 +71,9 @@ struct Args { /// Listen http endpoint for management and metrics in the form host:port. #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)] listen_http: String, + /// Availability zone of the safekeeper. + #[arg(long)] + availability_zone: Option, /// Do not wait for changes to be written safely to disk. Unsafe. #[arg(short, long)] no_sync: bool, @@ -166,6 +169,7 @@ fn main() -> anyhow::Result<()> { my_id: id, listen_pg_addr: args.listen_pg, listen_http_addr: args.listen_http, + availability_zone: args.availability_zone, no_sync: args.no_sync, broker_endpoint: args.broker_endpoint, broker_keepalive_interval: args.broker_keepalive_interval, diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 3b8434b2de..a589fe1869 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -9,6 +9,7 @@ use tracing::{info, info_span, Instrument}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; +use crate::metrics::TrafficMetrics; use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; use postgres_backend::QueryError; @@ -33,6 +34,7 @@ pub struct SafekeeperPostgresHandler { /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, claims: Option, + io_metrics: Option, } /// Parsed Postgres command. @@ -94,6 +96,11 @@ impl postgres_backend::Handler format!("Failed to parse {value} as timeline id") })?); } + Some(("availability_zone", client_az)) => { + if let Some(metrics) = self.io_metrics.as_ref() { + metrics.set_client_az(client_az) + } + } _ => continue, } } @@ -101,6 +108,9 @@ impl postgres_backend::Handler if let Some(app_name) = params.get("application_name") { self.appname = Some(app_name.to_owned()); + if let Some(metrics) = self.io_metrics.as_ref() { + metrics.set_app_name(app_name) + } } Ok(()) @@ -187,7 +197,7 @@ impl postgres_backend::Handler } impl SafekeeperPostgresHandler { - pub fn new(conf: SafeKeeperConf, conn_id: u32) -> Self { + pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option) -> Self { SafekeeperPostgresHandler { conf, appname: None, @@ -196,6 +206,7 @@ impl SafekeeperPostgresHandler { ttid: TenantTimelineId::empty(), conn_id, claims: None, + io_metrics, } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f4e753cdbf..2c28c5218d 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -52,6 +52,7 @@ pub struct SafeKeeperConf { pub my_id: NodeId, pub listen_pg_addr: String, pub listen_http_addr: String, + pub availability_zone: Option, pub no_sync: bool, pub broker_endpoint: Uri, pub broker_keepalive_interval: Duration, @@ -82,6 +83,7 @@ impl SafeKeeperConf { no_sync: false, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), + availability_zone: None, remote_storage: None, my_id: NodeId(0), broker_endpoint: storage_broker::DEFAULT_ENDPOINT diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 16aca24927..c3077b6dc5 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -1,15 +1,19 @@ //! Global safekeeper mertics and per-timeline safekeeper metrics. -use std::time::{Instant, SystemTime}; +use std::{ + sync::{Arc, RwLock}, + time::{Instant, SystemTime}, +}; use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS}; use anyhow::Result; use metrics::{ - core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts}, + core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec, }; use once_cell::sync::Lazy; + use postgres_ffi::XLogSegNo; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -63,13 +67,132 @@ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { }); pub static PG_IO_BYTES: Lazy = Lazy::new(|| { register_int_counter_vec!( - "safekeeper_pg_io_bytes", + "safekeeper_pg_io_bytes_total", "Bytes read from or written to any PostgreSQL connection", - &["direction"] + &["client_az", "sk_az", "app_name", "dir", "same_az"] ) .expect("Failed to register safekeeper_pg_io_bytes gauge") }); +pub const LABEL_UNKNOWN: &str = "unknown"; + +/// Labels for traffic metrics. +#[derive(Clone)] +struct ConnectionLabels { + /// Availability zone of the connection origin. + client_az: String, + /// Availability zone of the current safekeeper. + sk_az: String, + /// Client application name. + app_name: String, +} + +impl ConnectionLabels { + fn new() -> Self { + Self { + client_az: LABEL_UNKNOWN.to_string(), + sk_az: LABEL_UNKNOWN.to_string(), + app_name: LABEL_UNKNOWN.to_string(), + } + } + + fn build_metrics( + &self, + ) -> ( + GenericCounter, + GenericCounter, + ) { + let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) { + (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN, + (client_az, sk_az) => { + if client_az == sk_az { + "true" + } else { + "false" + } + } + }; + + let read = PG_IO_BYTES.with_label_values(&[ + &self.client_az, + &self.sk_az, + &self.app_name, + "read", + same_az, + ]); + let write = PG_IO_BYTES.with_label_values(&[ + &self.client_az, + &self.sk_az, + &self.app_name, + "write", + same_az, + ]); + (read, write) + } +} + +struct TrafficMetricsState { + /// Labels for traffic metrics. + labels: ConnectionLabels, + /// Total bytes read from this connection. + read: GenericCounter, + /// Total bytes written to this connection. + write: GenericCounter, +} + +/// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection. +#[derive(Clone)] +pub struct TrafficMetrics { + state: Arc>, +} + +impl Default for TrafficMetrics { + fn default() -> Self { + Self::new() + } +} + +impl TrafficMetrics { + pub fn new() -> Self { + let labels = ConnectionLabels::new(); + let (read, write) = labels.build_metrics(); + let state = TrafficMetricsState { + labels, + read, + write, + }; + Self { + state: Arc::new(RwLock::new(state)), + } + } + + pub fn set_client_az(&self, value: &str) { + let mut state = self.state.write().unwrap(); + state.labels.client_az = value.to_string(); + (state.read, state.write) = state.labels.build_metrics(); + } + + pub fn set_sk_az(&self, value: &str) { + let mut state = self.state.write().unwrap(); + state.labels.sk_az = value.to_string(); + (state.read, state.write) = state.labels.build_metrics(); + } + + pub fn set_app_name(&self, value: &str) { + let mut state = self.state.write().unwrap(); + state.labels.app_name = value.to_string(); + (state.read, state.write) = state.labels.build_metrics(); + } + + pub fn observe_read(&self, cnt: usize) { + self.state.read().unwrap().read.inc_by(cnt as u64) + } + + pub fn observe_write(&self, cnt: usize) { + self.state.read().unwrap().write.inc_by(cnt as u64) + } +} + /// Metrics for WalStorage in a single timeline. #[derive(Clone, Default)] pub struct WalStorageMetrics { diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 5f58c4f7fc..22f50c3428 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -9,8 +9,9 @@ use tokio::net::TcpStream; use tracing::*; use utils::measured_stream::MeasuredStream; +use crate::handler::SafekeeperPostgresHandler; +use crate::metrics::TrafficMetrics; use crate::SafeKeeperConf; -use crate::{handler::SafekeeperPostgresHandler, metrics::PG_IO_BYTES}; use postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. @@ -68,20 +69,21 @@ fn handle_socket( .build()?; let local = tokio::task::LocalSet::new(); - let read_metrics = PG_IO_BYTES.with_label_values(&["read"]); - let write_metrics = PG_IO_BYTES.with_label_values(&["write"]); - socket.set_nodelay(true)?; let peer_addr = socket.peer_addr()?; - // TODO: measure cross-az traffic + let traffic_metrics = TrafficMetrics::new(); + if let Some(current_az) = conf.availability_zone.as_deref() { + traffic_metrics.set_sk_az(current_az); + } + let socket = MeasuredStream::new( socket, |cnt| { - read_metrics.inc_by(cnt as u64); + traffic_metrics.observe_read(cnt); }, |cnt| { - write_metrics.inc_by(cnt as u64); + traffic_metrics.observe_write(cnt); }, ); @@ -89,7 +91,8 @@ fn handle_socket( None => AuthType::Trust, Some(_) => AuthType::NeonJWT, }; - let mut conn_handler = SafekeeperPostgresHandler::new(conf, conn_id); + let mut conn_handler = + SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown. diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index bf87cb3ad4..8021bf9914 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -3,6 +3,7 @@ import shutil import time from contextlib import closing from datetime import datetime +from itertools import chain from pathlib import Path from typing import List @@ -87,6 +88,7 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder): def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 + neon_env_builder.pageserver_config_override = "availability_zone='test_ps_az'" env = neon_env_builder.init_start() tenant_1, _ = env.neon_cli.create_tenant() @@ -122,6 +124,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): ps_metrics = all_metrics[0] sk_metrics = all_metrics[1:] + # Find all metrics among all safekeepers, accepts the same arguments as query_all() + def query_all_safekeepers(name, filter): + return list( + chain.from_iterable( + map( + lambda sk: sk.query_all(name, filter), + sk_metrics, + ) + ) + ) + ttids = [ {"tenant_id": str(tenant_1), "timeline_id": str(timeline_1)}, {"tenant_id": str(tenant_2), "timeline_id": str(timeline_2)}, @@ -162,6 +175,40 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): f"process_start_time_seconds (UTC): {datetime.fromtimestamp(metrics.query_one('process_start_time_seconds').value)}" ) + for io_direction in ["read", "write"]: + # Querying all metrics for number of bytes read/written by pageserver in another AZ + io_metrics = query_all_safekeepers( + "safekeeper_pg_io_bytes_total", + { + "app_name": "pageserver", + "client_az": "test_ps_az", + "dir": io_direction, + "same_az": "false", + }, + ) + total_bytes = sum(int(metric.value) for metric in io_metrics) + log.info(f"Pageserver {io_direction} bytes from another AZ: {total_bytes}") + # We expect some bytes to be read/written, to make sure metrics are working + assert total_bytes > 0 + + # Test (a subset of) safekeeper global metrics + for sk_m in sk_metrics: + # Test that every safekeeper has read some bytes + assert any( + map( + lambda x: x.value > 0, + sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "read"}), + ) + ), f"{sk_m.name} has not read bytes" + + # Test that every safekeeper has written some bytes + assert any( + map( + lambda x: x.value > 0, + sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "write"}), + ) + ), f"{sk_m.name} has not written bytes" + # Test (a subset of) pageserver global metrics for metric in PAGESERVER_GLOBAL_METRICS: ps_samples = ps_metrics.query_all(metric, {})