diff --git a/Cargo.lock b/Cargo.lock index bceb2b1e8f..59adf696a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,6 +869,19 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "consumption_metrics" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "rand", + "serde", + "serde_with", + "utils", + "workspace_hack", +] + [[package]] name = "control_plane" version = "0.1.0" @@ -2228,6 +2241,7 @@ dependencies = [ "clap 4.0.32", "close_fds", "const_format", + "consumption_metrics", "crc32c", "criterion", "crossbeam-utils", @@ -2669,12 +2683,16 @@ dependencies = [ "base64 0.13.1", "bstr", "bytes", + "chrono", "clap 4.0.32", + "consumption_metrics", "futures", "git-version", "hashbrown 0.13.2", "hex", "hmac", + "hostname", + "humantime", "hyper", "hyper-tungstenite", "itertools", @@ -2684,6 +2702,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "pq_proto", + "prometheus", "rand", "rcgen", "regex", diff --git a/Cargo.toml b/Cargo.toml index 264d836c81..74cc16d690 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ hashbrown = "0.13" hex = "0.4" hex-literal = "0.3" hmac = "0.12.1" +hostname = "0.3.1" humantime = "2.1" humantime-serde = "1.1.1" hyper = "0.14" @@ -117,6 +118,7 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Local libraries +consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } diff --git a/libs/consumption_metrics/Cargo.toml b/libs/consumption_metrics/Cargo.toml new file mode 100644 index 0000000000..f26aa2fbc5 --- /dev/null +++ b/libs/consumption_metrics/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "consumption_metrics" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.68" +chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } +rand = "0.8.3" +serde = "1.0.152" +serde_with = "2.1.0" +utils = { version = "0.1.0", path = "../utils" } +workspace_hack = { version = "0.1.0", path = "../../workspace_hack" } diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs new file mode 100644 index 0000000000..3aac00662d --- /dev/null +++ b/libs/consumption_metrics/src/lib.rs @@ -0,0 +1,50 @@ +//! +//! Shared code for consumption metics collection +//! +use chrono::{DateTime, Utc}; +use rand::Rng; +use serde::Serialize; + +#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[serde(tag = "type")] +pub enum EventType { + #[serde(rename = "absolute")] + Absolute { time: DateTime }, + #[serde(rename = "incremental")] + Incremental { + start_time: DateTime, + stop_time: DateTime, + }, +} + +#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub struct Event { + #[serde(flatten)] + #[serde(rename = "type")] + pub kind: EventType, + + pub metric: &'static str, + pub idempotency_key: String, + pub value: u64, + + #[serde(flatten)] + pub extra: Extra, +} + +pub fn idempotency_key(node_id: String) -> String { + format!( + "{}-{}-{:04}", + Utc::now(), + node_id, + rand::thread_rng().gen_range(0..=9999) + ) +} + +pub const CHUNK_SIZE: usize = 1000; + +// Just a wrapper around a slice of events +// to serialize it as `{"events" : [ ] } +#[derive(serde::Serialize)] +pub struct EventChunk<'a, T> { + pub events: &'a [T], +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 34de250bb4..cb9e4478bf 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -21,6 +21,7 @@ chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["string"] } close_fds.workspace = true const_format.workspace = true +consumption_metrics.workspace = true crc32c.workspace = true crossbeam-utils.workspace = true fail.workspace = true diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 423cb6c1d6..f8a0bc6f08 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -3,142 +3,42 @@ //! and push them to a HTTP endpoint. //! Cache metrics to send only the updated ones. //! - -use anyhow; -use tracing::*; -use utils::id::NodeId; -use utils::id::TimelineId; - -use crate::task_mgr; -use crate::task_mgr::TaskKind; -use crate::task_mgr::BACKGROUND_RUNTIME; - +use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::mgr; +use anyhow; +use chrono::Utc; +use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; use pageserver_api::models::TenantState; -use utils::id::TenantId; - -use serde::{Deserialize, Serialize}; +use reqwest::Url; +use serde::Serialize; use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; -use std::fmt; -use std::str::FromStr; use std::time::Duration; +use tracing::*; +use utils::id::{NodeId, TenantId, TimelineId}; -use chrono::{DateTime, Utc}; -use rand::Rng; -use reqwest::Url; +const WRITTEN_SIZE: &str = "written_size"; +const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size"; +const RESIDENT_SIZE: &str = "resident_size"; +const REMOTE_STORAGE_SIZE: &str = "remote_storage_size"; +const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size"; -/// ConsumptionMetric struct that defines the format for one metric entry -/// i.e. -/// -/// ```json -/// { -/// "metric": "remote_storage_size", -/// "type": "absolute", -/// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d", -/// "timeline_id": "a03ebb4f5922a1c56ff7485cc8854143", -/// "time": "2022-12-28T11:07:19.317310284Z", -/// "idempotency_key": "2022-12-28 11:07:19.317310324 UTC-1-4019", -/// "value": 12345454, -/// } -/// ``` #[serde_as] -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] -pub struct ConsumptionMetric { - pub metric: ConsumptionMetricKind, - #[serde(rename = "type")] - pub metric_type: &'static str, +#[derive(Serialize)] +struct Ids { #[serde_as(as = "DisplayFromStr")] - pub tenant_id: TenantId, + tenant_id: TenantId, #[serde_as(as = "Option")] #[serde(skip_serializing_if = "Option::is_none")] - pub timeline_id: Option, - pub time: DateTime, - pub idempotency_key: String, - pub value: u64, -} - -impl ConsumptionMetric { - pub fn new_absolute( - metric: ConsumptionMetricKind, - tenant_id: TenantId, - timeline_id: Option, - value: u64, - node_id: NodeId, - rng: &mut R, - ) -> Self { - Self { - metric, - metric_type: "absolute", - tenant_id, - timeline_id, - time: Utc::now(), - // key that allows metric collector to distinguish unique events - idempotency_key: format!("{}-{}-{:04}", Utc::now(), node_id, rng.gen_range(0..=9999)), - value, - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ConsumptionMetricKind { - /// Amount of WAL produced , by a timeline, i.e. last_record_lsn - /// This is an absolute, per-timeline metric. - WrittenSize, - /// Size of all tenant branches including WAL - /// This is an absolute, per-tenant metric. - /// This is the same metric that tenant/tenant_id/size endpoint returns. - SyntheticStorageSize, - /// Size of all the layer files in the tenant's directory on disk on the pageserver. - /// This is an absolute, per-tenant metric. - /// See also prometheus metric RESIDENT_PHYSICAL_SIZE. - ResidentSize, - /// Size of the remote storage (S3) directory. - /// This is an absolute, per-tenant metric. - RemoteStorageSize, - /// Logical size of the data in the timeline - /// This is an absolute, per-timeline metric - TimelineLogicalSize, -} - -impl FromStr for ConsumptionMetricKind { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - match s { - "written_size" => Ok(Self::WrittenSize), - "synthetic_storage_size" => Ok(Self::SyntheticStorageSize), - "resident_size" => Ok(Self::ResidentSize), - "remote_storage_size" => Ok(Self::RemoteStorageSize), - "timeline_logical_size" => Ok(Self::TimelineLogicalSize), - _ => anyhow::bail!("invalid value \"{s}\" for metric type"), - } - } -} - -impl fmt::Display for ConsumptionMetricKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self { - ConsumptionMetricKind::WrittenSize => "written_size", - ConsumptionMetricKind::SyntheticStorageSize => "synthetic_storage_size", - ConsumptionMetricKind::ResidentSize => "resident_size", - ConsumptionMetricKind::RemoteStorageSize => "remote_storage_size", - ConsumptionMetricKind::TimelineLogicalSize => "timeline_logical_size", - }) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ConsumptionMetricsKey { - tenant_id: TenantId, timeline_id: Option, - metric: ConsumptionMetricKind, } -#[derive(serde::Serialize)] -struct EventChunk<'a> { - events: &'a [ConsumptionMetric], +/// Key that uniquely identifies the object, this metric describes. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PageserverConsumptionMetricsKey { + pub tenant_id: TenantId, + pub timeline_id: Option, + pub metric: &'static str, } /// Main thread that serves metrics collection @@ -170,7 +70,7 @@ pub async fn collect_metrics( // define client here to reuse it for all requests let client = reqwest::Client::new(); - let mut cached_metrics: HashMap = HashMap::new(); + let mut cached_metrics: HashMap = HashMap::new(); loop { tokio::select! { @@ -179,7 +79,7 @@ pub async fn collect_metrics( return Ok(()); }, _ = ticker.tick() => { - if let Err(err) = collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await + if let Err(err) = collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await { error!("metrics collection failed: {err:?}"); } @@ -192,15 +92,20 @@ pub async fn collect_metrics( /// /// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`. /// Cache metrics to avoid sending the same metrics multiple times. -pub async fn collect_metrics_task( +/// +/// TODO +/// - refactor this function (chunking+sending part) to reuse it in proxy module; +/// - improve error handling. Now if one tenant fails to collect metrics, +/// the whole iteration fails and metrics for other tenants are not collected. +pub async fn collect_metrics_iteration( client: &reqwest::Client, - cached_metrics: &mut HashMap, + cached_metrics: &mut HashMap, metric_collection_endpoint: &reqwest::Url, node_id: NodeId, ) -> anyhow::Result<()> { - let mut current_metrics: Vec<(ConsumptionMetricsKey, u64)> = Vec::new(); + let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new(); trace!( - "starting collect_metrics_task. metric_collection_endpoint: {}", + "starting collect_metrics_iteration. metric_collection_endpoint: {}", metric_collection_endpoint ); @@ -224,10 +129,10 @@ pub async fn collect_metrics_task( let timeline_written_size = u64::from(timeline.get_last_record_lsn()); current_metrics.push(( - ConsumptionMetricsKey { + PageserverConsumptionMetricsKey { tenant_id, timeline_id: Some(timeline.timeline_id), - metric: ConsumptionMetricKind::WrittenSize, + metric: WRITTEN_SIZE, }, timeline_written_size, )); @@ -236,10 +141,10 @@ pub async fn collect_metrics_task( // Only send timeline logical size when it is fully calculated. if is_exact { current_metrics.push(( - ConsumptionMetricsKey { + PageserverConsumptionMetricsKey { tenant_id, timeline_id: Some(timeline.timeline_id), - metric: ConsumptionMetricKind::TimelineLogicalSize, + metric: TIMELINE_LOGICAL_SIZE, }, timeline_logical_size, )); @@ -257,19 +162,19 @@ pub async fn collect_metrics_task( ); current_metrics.push(( - ConsumptionMetricsKey { + PageserverConsumptionMetricsKey { tenant_id, timeline_id: None, - metric: ConsumptionMetricKind::ResidentSize, + metric: RESIDENT_SIZE, }, tenant_resident_size, )); current_metrics.push(( - ConsumptionMetricsKey { + PageserverConsumptionMetricsKey { tenant_id, timeline_id: None, - metric: ConsumptionMetricKind::RemoteStorageSize, + metric: REMOTE_STORAGE_SIZE, }, tenant_remote_size, )); @@ -278,10 +183,10 @@ pub async fn collect_metrics_task( // Here we only use cached value, which may lag behind the real latest one let tenant_synthetic_size = tenant.get_cached_synthetic_size(); current_metrics.push(( - ConsumptionMetricsKey { + PageserverConsumptionMetricsKey { tenant_id, timeline_id: None, - metric: ConsumptionMetricKind::SyntheticStorageSize, + metric: SYNTHETIC_STORAGE_SIZE, }, tenant_synthetic_size, )); @@ -300,35 +205,29 @@ pub async fn collect_metrics_task( // Send metrics. // Split into chunks of 1000 metrics to avoid exceeding the max request size - const CHUNK_SIZE: usize = 1000; let chunks = current_metrics.chunks(CHUNK_SIZE); - let mut chunk_to_send: Vec = Vec::with_capacity(1000); + let mut chunk_to_send: Vec> = Vec::with_capacity(CHUNK_SIZE); for chunk in chunks { chunk_to_send.clear(); - // this code block is needed to convince compiler - // that rng is not reused aroung await point - { - // enrich metrics with timestamp and metric_kind before sending - let mut rng = rand::thread_rng(); - chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| { - ConsumptionMetric::new_absolute( - curr_key.metric, - curr_key.tenant_id, - curr_key.timeline_id, - *curr_val, - node_id, - &mut rng, - ) - })); - } + // enrich metrics with type,timestamp and idempotency key before sending + chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event { + kind: EventType::Absolute { time: Utc::now() }, + metric: curr_key.metric, + idempotency_key: idempotency_key(node_id.to_string()), + value: *curr_val, + extra: Ids { + tenant_id: curr_key.tenant_id, + timeline_id: curr_key.timeline_id, + }, + })); let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: &chunk_to_send, }) - .expect("ConsumptionMetric should not fail serialization"); + .expect("PageserverConsumptionMetric should not fail serialization"); let res = client .post(metric_collection_endpoint.clone()) diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index a9168daeca..03a6ddac5d 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -11,6 +11,8 @@ base64.workspace = true bstr.workspace = true bytes = {workspace = true, features = ['serde'] } clap.workspace = true +chrono.workspace = true +consumption_metrics.workspace = true futures.workspace = true git-version.workspace = true hashbrown.workspace = true @@ -48,6 +50,9 @@ x509-parser.workspace = true metrics.workspace = true pq_proto.workspace = true utils.workspace = true +prometheus.workspace = true +humantime.workspace = true +hostname.workspace = true workspace_hack.workspace = true diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 031fa84509..33a8fff847 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -5,6 +5,12 @@ use std::sync::Arc; pub struct ProxyConfig { pub tls_config: Option, pub auth_backend: auth::BackendType<'static, ()>, + pub metric_collection_config: Option, +} + +pub struct MetricCollectionConfig { + pub endpoint: reqwest::Url, + pub interval: std::time::Duration, } pub struct TlsConfig { diff --git a/proxy/src/main.rs b/proxy/src/main.rs index aa6766c102..5d44774df9 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -11,6 +11,7 @@ mod config; mod console; mod error; mod http; +mod metrics; mod mgmt; mod parse; mod proxy; @@ -20,14 +21,14 @@ mod stream; mod url; mod waiters; +use ::metrics::set_build_info_metric; use anyhow::{bail, Context}; use clap::{self, Arg}; use config::ProxyConfig; use futures::FutureExt; -use metrics::set_build_info_metric; use std::{borrow::Cow, future::Future, net::SocketAddr}; use tokio::{net::TcpListener, task::JoinError}; -use tracing::info; +use tracing::{info, info_span, Instrument}; use utils::project_git_version; use utils::sentry_init::{init_sentry, release_name}; @@ -65,6 +66,22 @@ async fn main() -> anyhow::Result<()> { let mgmt_address: SocketAddr = arg_matches.get_one::("mgmt").unwrap().parse()?; let http_address: SocketAddr = arg_matches.get_one::("http").unwrap().parse()?; + let metric_collection_config = match + ( + arg_matches.get_one::("metric-collection-endpoint"), + arg_matches.get_one::("metric-collection-interval"), + ) { + + (Some(endpoint), Some(interval)) => { + Some(config::MetricCollectionConfig { + endpoint: endpoint.parse()?, + interval: humantime::parse_duration(interval)?, + }) + } + (None, None) => None, + _ => bail!("either both or neither metric-collection-endpoint and metric-collection-interval must be specified"), + }; + let auth_backend = match arg_matches .get_one::("auth-backend") .unwrap() @@ -95,6 +112,7 @@ async fn main() -> anyhow::Result<()> { let config: &ProxyConfig = Box::leak(Box::new(ProxyConfig { tls_config, auth_backend, + metric_collection_config, })); info!("Version: {GIT_VERSION}"); @@ -126,6 +144,21 @@ async fn main() -> anyhow::Result<()> { ))); } + if let Some(metric_collection_config) = &config.metric_collection_config { + let hostname = hostname::get()? + .into_string() + .map_err(|e| anyhow::anyhow!("failed to get hostname {e:?}"))?; + + tasks.push(tokio::spawn( + metrics::collect_metrics( + &metric_collection_config.endpoint, + metric_collection_config.interval, + hostname, + ) + .instrument(info_span!("collect_metrics")), + )); + } + let tasks = tasks.into_iter().map(flatten_err); set_build_info_metric(GIT_VERSION); @@ -199,6 +232,16 @@ fn cli() -> clap::Command { .alias("ssl-cert") // backwards compatibility .help("path to TLS cert for client postgres connections"), ) + .arg( + Arg::new("metric-collection-endpoint") + .long("metric-collection-endpoint") + .help("metric collection HTTP endpoint"), + ) + .arg( + Arg::new("metric-collection-interval") + .long("metric-collection-interval") + .help("metric collection interval"), + ) } #[test] diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs new file mode 100644 index 0000000000..d9aa4aec8c --- /dev/null +++ b/proxy/src/metrics.rs @@ -0,0 +1,196 @@ +//! +//! Periodically collect proxy consumption metrics +//! and push them to a HTTP endpoint. +//! +use chrono::{DateTime, Utc}; +use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; +use serde::Serialize; +use std::{collections::HashMap, time::Duration}; +use tracing::{debug, error, log::info, trace}; + +const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client"; + +/// +/// Key that uniquely identifies the object, this metric describes. +/// Currently, endpoint_id is enough, but this may change later, +/// so keep it in a named struct. +/// +/// Both the proxy and the ingestion endpoint will live in the same region (or cell) +/// so while the project-id is unique across regions the whole pipeline will work correctly +/// because we enrich the event with project_id in the control-plane endpoint. +/// +#[derive(Eq, Hash, PartialEq, Serialize)] +pub struct Ids { + pub endpoint_id: String, +} + +pub async fn collect_metrics( + metric_collection_endpoint: &reqwest::Url, + metric_collection_interval: Duration, + hostname: String, +) -> anyhow::Result<()> { + scopeguard::defer! { + info!("collect_metrics has shut down"); + } + + let mut ticker = tokio::time::interval(metric_collection_interval); + + info!( + "starting collect_metrics. metric_collection_endpoint: {}", + metric_collection_endpoint + ); + + // define client here to reuse it for all requests + let client = reqwest::Client::new(); + let mut cached_metrics: HashMap)> = HashMap::new(); + + loop { + tokio::select! { + _ = ticker.tick() => { + + match collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, hostname.clone()).await + { + Err(e) => { + error!("Failed to send consumption metrics: {} ", e); + }, + Ok(_) => { trace!("collect_metrics_iteration completed successfully") }, + } + } + } + } +} + +pub fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime))> { + let mut current_metrics: Vec<(Ids, (u64, DateTime))> = Vec::new(); + let metrics = prometheus::default_registry().gather(); + + for m in metrics { + if m.get_name() == "proxy_io_bytes_per_client" { + for ms in m.get_metric() { + let direction = ms + .get_label() + .iter() + .find(|l| l.get_name() == "direction") + .unwrap() + .get_value(); + + // Only collect metric for outbound traffic + if direction == "tx" { + let endpoint_id = ms + .get_label() + .iter() + .find(|l| l.get_name() == "endpoint_id") + .unwrap() + .get_value(); + let value = ms.get_counter().get_value() as u64; + + debug!("endpoint_id:val - {}: {}", endpoint_id, value); + current_metrics.push(( + Ids { + endpoint_id: endpoint_id.to_string(), + }, + (value, Utc::now()), + )); + } + } + } + } + + current_metrics +} + +pub async fn collect_metrics_iteration( + client: &reqwest::Client, + cached_metrics: &mut HashMap)>, + metric_collection_endpoint: &reqwest::Url, + hostname: String, +) -> anyhow::Result<()> { + info!( + "starting collect_metrics_iteration. metric_collection_endpoint: {}", + metric_collection_endpoint + ); + + let current_metrics = gather_proxy_io_bytes_per_client(); + + let metrics_to_send: Vec> = current_metrics + .iter() + .filter_map(|(curr_key, (curr_val, curr_time))| { + let mut start_time = *curr_time; + let mut value = *curr_val; + + if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) { + // Only send metrics updates if the metric has changed + if curr_val - prev_val > 0 { + value = curr_val - prev_val; + start_time = *prev_time; + } else { + return None; + } + }; + + Some(Event { + kind: EventType::Incremental { + start_time, + stop_time: *curr_time, + }, + metric: PROXY_IO_BYTES_PER_CLIENT, + idempotency_key: idempotency_key(hostname.clone()), + value, + extra: Ids { + endpoint_id: curr_key.endpoint_id.clone(), + }, + }) + }) + .collect(); + + if metrics_to_send.is_empty() { + trace!("no new metrics to send"); + return Ok(()); + } + + // Send metrics. + // Split into chunks of 1000 metrics to avoid exceeding the max request size + for chunk in metrics_to_send.chunks(CHUNK_SIZE) { + let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: chunk }) + .expect("ProxyConsumptionMetric should not fail serialization"); + + let res = client + .post(metric_collection_endpoint.clone()) + .json(&chunk_json) + .send() + .await; + + let res = match res { + Ok(x) => x, + Err(err) => { + error!("failed to send metrics: {:?}", err); + continue; + } + }; + + if res.status().is_success() { + // update cached metrics after they were sent successfully + for send_metric in chunk { + let stop_time = match send_metric.kind { + EventType::Incremental { stop_time, .. } => stop_time, + _ => unreachable!(), + }; + + cached_metrics + .entry(Ids { + endpoint_id: send_metric.extra.endpoint_id.clone(), + }) + // update cached value (add delta) and time + .and_modify(|e| { + e.0 += send_metric.value; + e.1 = stop_time + }) + // cache new metric + .or_insert((send_metric.value, stop_time)); + } + } else { + error!("metrics endpoint refused the sent metrics: {:?}", res); + } + } + Ok(()) +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f284be8753..5831222fda 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -22,6 +22,7 @@ from itertools import chain, product from pathlib import Path from types import TracebackType from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast +from urllib.parse import urlparse import asyncpg import backoff # type: ignore @@ -2323,6 +2324,8 @@ class NeonProxy(PgProtocol): http_port: int, mgmt_port: int, auth_backend: NeonProxy.AuthBackend, + metric_collection_endpoint: Optional[str] = None, + metric_collection_interval: Optional[str] = None, ): host = "127.0.0.1" super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port) @@ -2333,6 +2336,8 @@ class NeonProxy(PgProtocol): self.proxy_port = proxy_port self.mgmt_port = mgmt_port self.auth_backend = auth_backend + self.metric_collection_endpoint = metric_collection_endpoint + self.metric_collection_interval = metric_collection_interval self._popen: Optional[subprocess.Popen[bytes]] = None def start(self) -> NeonProxy: @@ -2344,6 +2349,16 @@ class NeonProxy(PgProtocol): *["--mgmt", f"{self.host}:{self.mgmt_port}"], *self.auth_backend.extra_args(), ] + + if ( + self.metric_collection_endpoint is not None + and self.metric_collection_interval is not None + ): + args += [ + *["--metric-collection-endpoint", self.metric_collection_endpoint], + *["--metric-collection-interval", self.metric_collection_interval], + ] + self._popen = subprocess.Popen(args) self._wait_until_ready() return self @@ -2357,6 +2372,25 @@ class NeonProxy(PgProtocol): request_result.raise_for_status() return request_result.text + @staticmethod + def get_session_id(uri_prefix, uri_line): + assert uri_prefix in uri_line + + url_parts = urlparse(uri_line) + psql_session_id = url_parts.path[1:] + assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars" + + return psql_session_id + + @staticmethod + async def find_auth_link(link_auth_uri, proc): + for _ in range(100): + line = (await proc.stderr.readline()).decode("utf-8").strip() + log.info(f"psql line: {line}") + if link_auth_uri in line: + log.info(f"SUCCESS, found auth url: {line}") + return line + def __enter__(self) -> NeonProxy: return self @@ -2371,6 +2405,46 @@ class NeonProxy(PgProtocol): # it's a child process. This is mostly to clean up in between different tests. self._popen.kill() + @staticmethod + async def activate_link_auth( + local_vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=True + ): + + pg_user = "proxy" + + if create_user: + log.info("creating a new user for link auth test") + local_vanilla_pg.start() + local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser") + + db_info = json.dumps( + { + "session_id": psql_session_id, + "result": { + "Success": { + "host": local_vanilla_pg.default_options["host"], + "port": local_vanilla_pg.default_options["port"], + "dbname": local_vanilla_pg.default_options["dbname"], + "user": pg_user, + "aux": { + "project_id": "test_project_id", + "endpoint_id": "test_endpoint_id", + "branch_id": "test_branch_id", + }, + } + }, + } + ) + + log.info("sending session activation message") + psql = await PSQL( + host=proxy_with_metric_collector.host, + port=proxy_with_metric_collector.mgmt_port, + ).run(db_info) + assert psql.stdout is not None + out = (await psql.stdout.read()).decode("utf-8").strip() + assert out == "ok" + @pytest.fixture(scope="function") def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]: diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index d1fcab7a62..3f252992f5 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -1,12 +1,22 @@ +# +# Test for collecting metrics from pageserver and proxy. +# Use mock HTTP server to receive metrics and verify that they look sane. +# + import time +from pathlib import Path +from typing import Iterator import pytest from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import ( + PSQL, NeonEnvBuilder, + NeonProxy, PortDistributor, RemoteStorageKind, + VanillaPostgres, wait_for_last_flush_lsn, ) from fixtures.types import TenantId, TimelineId @@ -22,6 +32,10 @@ def httpserver_listen_address(port_distributor: PortDistributor): return ("localhost", port) +# ============================================================================== +# Storage metrics tests +# ============================================================================== + initial_tenant = TenantId.generate() remote_uploaded = 0 checks = { @@ -161,3 +175,102 @@ def test_metric_collection( assert len(metric_kinds_checked) == len( checks ), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered" + + +# ============================================================================== +# Proxy metrics tests +# ============================================================================== + + +def proxy_metrics_handler(request: Request) -> Response: + if request.json is None: + return Response(status=400) + + events = request.json["events"] + log.info("received events:") + log.info(events) + + # perform basic sanity checks + for event in events: + assert event["metric"] == "proxy_io_bytes_per_client" + assert event["endpoint_id"] == "test_endpoint_id" + assert event["value"] >= 0 + assert event["stop_time"] >= event["start_time"] + + return Response(status=200) + + +@pytest.fixture(scope="session") +def proxy_with_metric_collector( + port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address +) -> Iterator[NeonProxy]: + """Neon proxy that routes through link auth and has metric collection enabled.""" + + http_port = port_distributor.get_port() + proxy_port = port_distributor.get_port() + mgmt_port = port_distributor.get_port() + + (host, port) = httpserver_listen_address + metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + metric_collection_interval = "5s" + + with NeonProxy( + neon_binpath=neon_binpath, + proxy_port=proxy_port, + http_port=http_port, + mgmt_port=mgmt_port, + metric_collection_endpoint=metric_collection_endpoint, + metric_collection_interval=metric_collection_interval, + auth_backend=NeonProxy.Link(), + ) as proxy: + proxy.start() + yield proxy + + +@pytest.mark.asyncio +async def test_proxy_metric_collection( + httpserver: HTTPServer, + httpserver_listen_address, + proxy_with_metric_collector: NeonProxy, + vanilla_pg: VanillaPostgres, +): + # mock http server that returns OK for the metrics + httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( + proxy_metrics_handler + ) + + # do something to generate load to generate metrics + # sleep for 5 seconds to give metric collector time to collect metrics + psql = await PSQL( + host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port + ).run( + "create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42" + ) + + base_uri = proxy_with_metric_collector.link_auth_uri + link = await NeonProxy.find_auth_link(base_uri, psql) + + psql_session_id = NeonProxy.get_session_id(base_uri, link) + await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id) + + assert psql.stdout is not None + out = (await psql.stdout.read()).decode("utf-8").strip() + assert out == "42" + + # do something to generate load to generate metrics + # sleep for 5 seconds to give metric collector time to collect metrics + psql = await PSQL( + host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port + ).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42") + + link = await NeonProxy.find_auth_link(base_uri, psql) + psql_session_id = NeonProxy.get_session_id(base_uri, link) + await NeonProxy.activate_link_auth( + vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False + ) + + assert psql.stdout is not None + out = (await psql.stdout.read()).decode("utf-8").strip() + assert out == "42" + + httpserver.check() diff --git a/test_runner/regress/test_proxy.py b/test_runner/regress/test_proxy.py index e13ba51f4b..99a3f2fa86 100644 --- a/test_runner/regress/test_proxy.py +++ b/test_runner/regress/test_proxy.py @@ -1,9 +1,5 @@ -import json -from urllib.parse import urlparse - import psycopg2 import pytest -from fixtures.log_helper import log from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres @@ -30,62 +26,14 @@ def test_password_hack(static_proxy: NeonProxy): @pytest.mark.asyncio async def test_psql_session_id(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy): - def get_session_id(uri_prefix, uri_line): - assert uri_prefix in uri_line - - url_parts = urlparse(uri_line) - psql_session_id = url_parts.path[1:] - assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars" - - return psql_session_id - - async def find_auth_link(link_auth_uri, proc): - for _ in range(100): - line = (await proc.stderr.readline()).decode("utf-8").strip() - log.info(f"psql line: {line}") - if link_auth_uri in line: - log.info(f"SUCCESS, found auth url: {line}") - return line - - async def activate_link_auth(local_vanilla_pg, link_proxy, psql_session_id): - pg_user = "proxy" - - log.info("creating a new user for link auth test") - local_vanilla_pg.start() - local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser") - - db_info = json.dumps( - { - "session_id": psql_session_id, - "result": { - "Success": { - "host": local_vanilla_pg.default_options["host"], - "port": local_vanilla_pg.default_options["port"], - "dbname": local_vanilla_pg.default_options["dbname"], - "user": pg_user, - "aux": { - "project_id": "project", - "endpoint_id": "endpoint", - "branch_id": "branch", - }, - } - }, - } - ) - - log.info("sending session activation message") - psql = await PSQL(host=link_proxy.host, port=link_proxy.mgmt_port).run(db_info) - assert psql.stdout is not None - out = (await psql.stdout.read()).decode("utf-8").strip() - assert out == "ok" psql = await PSQL(host=link_proxy.host, port=link_proxy.proxy_port).run("select 42") base_uri = link_proxy.link_auth_uri - link = await find_auth_link(base_uri, psql) + link = await NeonProxy.find_auth_link(base_uri, psql) - psql_session_id = get_session_id(base_uri, link) - await activate_link_auth(vanilla_pg, link_proxy, psql_session_id) + psql_session_id = NeonProxy.get_session_id(base_uri, link) + await NeonProxy.activate_link_auth(vanilla_pg, link_proxy, psql_session_id) assert psql.stdout is not None out = (await psql.stdout.read()).decode("utf-8").strip()