Proxy metrics (#3290)

Implement proxy metrics collection.
Only collect metric for outbound traffic.

Add proxy CLI parameters:
- metric-collection-endpoint
- metric-collection-interval.

Add test_proxy_metric_collection test.

Move shared consumption metrics code to libs/consumption_metrics.
Refactor the code.
This commit is contained in:
Anastasia Lubennikova
2023-01-16 17:17:28 +02:00
committed by GitHub
parent 5c6a7a17cb
commit 2cbe84b78f
13 changed files with 586 additions and 214 deletions

19
Cargo.lock generated
View File

@@ -869,6 +869,19 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "consumption_metrics"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"rand",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "control_plane"
version = "0.1.0"
@@ -2228,6 +2241,7 @@ dependencies = [
"clap 4.0.32",
"close_fds",
"const_format",
"consumption_metrics",
"crc32c",
"criterion",
"crossbeam-utils",
@@ -2669,12 +2683,16 @@ dependencies = [
"base64 0.13.1",
"bstr",
"bytes",
"chrono",
"clap 4.0.32",
"consumption_metrics",
"futures",
"git-version",
"hashbrown 0.13.2",
"hex",
"hmac",
"hostname",
"humantime",
"hyper",
"hyper-tungstenite",
"itertools",
@@ -2684,6 +2702,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"pq_proto",
"prometheus",
"rand",
"rcgen",
"regex",

View File

@@ -47,6 +47,7 @@ hashbrown = "0.13"
hex = "0.4"
hex-literal = "0.3"
hmac = "0.12.1"
hostname = "0.3.1"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
@@ -117,6 +118,7 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Local libraries
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }

View File

@@ -0,0 +1,16 @@
[package]
name = "consumption_metrics"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.68"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
rand = "0.8.3"
serde = "1.0.152"
serde_with = "2.1.0"
utils = { version = "0.1.0", path = "../utils" }
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }

View File

@@ -0,0 +1,50 @@
//!
//! Shared code for consumption metics collection
//!
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
Absolute { time: DateTime<Utc> },
#[serde(rename = "incremental")]
Incremental {
start_time: DateTime<Utc>,
stop_time: DateTime<Utc>,
},
}
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra> {
#[serde(flatten)]
#[serde(rename = "type")]
pub kind: EventType,
pub metric: &'static str,
pub idempotency_key: String,
pub value: u64,
#[serde(flatten)]
pub extra: Extra,
}
pub fn idempotency_key(node_id: String) -> String {
format!(
"{}-{}-{:04}",
Utc::now(),
node_id,
rand::thread_rng().gen_range(0..=9999)
)
}
pub const CHUNK_SIZE: usize = 1000;
// Just a wrapper around a slice of events
// to serialize it as `{"events" : [ ] }
#[derive(serde::Serialize)]
pub struct EventChunk<'a, T> {
pub events: &'a [T],
}

View File

@@ -21,6 +21,7 @@ chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
close_fds.workspace = true
const_format.workspace = true
consumption_metrics.workspace = true
crc32c.workspace = true
crossbeam-utils.workspace = true
fail.workspace = true

View File

@@ -3,142 +3,42 @@
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use anyhow;
use tracing::*;
use utils::id::NodeId;
use utils::id::TimelineId;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use anyhow;
use chrono::Utc;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use pageserver_api::models::TenantState;
use utils::id::TenantId;
use serde::{Deserialize, Serialize};
use reqwest::Url;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
use chrono::{DateTime, Utc};
use rand::Rng;
use reqwest::Url;
const WRITTEN_SIZE: &str = "written_size";
const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size";
const RESIDENT_SIZE: &str = "resident_size";
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
/// ConsumptionMetric struct that defines the format for one metric entry
/// i.e.
///
/// ```json
/// {
/// "metric": "remote_storage_size",
/// "type": "absolute",
/// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d",
/// "timeline_id": "a03ebb4f5922a1c56ff7485cc8854143",
/// "time": "2022-12-28T11:07:19.317310284Z",
/// "idempotency_key": "2022-12-28 11:07:19.317310324 UTC-1-4019",
/// "value": 12345454,
/// }
/// ```
#[serde_as]
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct ConsumptionMetric {
pub metric: ConsumptionMetricKind,
#[serde(rename = "type")]
pub metric_type: &'static str,
#[derive(Serialize)]
struct Ids {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_id: Option<TimelineId>,
pub time: DateTime<Utc>,
pub idempotency_key: String,
pub value: u64,
}
impl ConsumptionMetric {
pub fn new_absolute<R: Rng + ?Sized>(
metric: ConsumptionMetricKind,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
value: u64,
node_id: NodeId,
rng: &mut R,
) -> Self {
Self {
metric,
metric_type: "absolute",
tenant_id,
timeline_id,
time: Utc::now(),
// key that allows metric collector to distinguish unique events
idempotency_key: format!("{}-{}-{:04}", Utc::now(), node_id, rng.gen_range(0..=9999)),
value,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConsumptionMetricKind {
/// Amount of WAL produced , by a timeline, i.e. last_record_lsn
/// This is an absolute, per-timeline metric.
WrittenSize,
/// Size of all tenant branches including WAL
/// This is an absolute, per-tenant metric.
/// This is the same metric that tenant/tenant_id/size endpoint returns.
SyntheticStorageSize,
/// Size of all the layer files in the tenant's directory on disk on the pageserver.
/// This is an absolute, per-tenant metric.
/// See also prometheus metric RESIDENT_PHYSICAL_SIZE.
ResidentSize,
/// Size of the remote storage (S3) directory.
/// This is an absolute, per-tenant metric.
RemoteStorageSize,
/// Logical size of the data in the timeline
/// This is an absolute, per-timeline metric
TimelineLogicalSize,
}
impl FromStr for ConsumptionMetricKind {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"written_size" => Ok(Self::WrittenSize),
"synthetic_storage_size" => Ok(Self::SyntheticStorageSize),
"resident_size" => Ok(Self::ResidentSize),
"remote_storage_size" => Ok(Self::RemoteStorageSize),
"timeline_logical_size" => Ok(Self::TimelineLogicalSize),
_ => anyhow::bail!("invalid value \"{s}\" for metric type"),
}
}
}
impl fmt::Display for ConsumptionMetricKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ConsumptionMetricKind::WrittenSize => "written_size",
ConsumptionMetricKind::SyntheticStorageSize => "synthetic_storage_size",
ConsumptionMetricKind::ResidentSize => "resident_size",
ConsumptionMetricKind::RemoteStorageSize => "remote_storage_size",
ConsumptionMetricKind::TimelineLogicalSize => "timeline_logical_size",
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConsumptionMetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: ConsumptionMetricKind,
}
#[derive(serde::Serialize)]
struct EventChunk<'a> {
events: &'a [ConsumptionMetric],
/// Key that uniquely identifies the object, this metric describes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageserverConsumptionMetricsKey {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub metric: &'static str,
}
/// Main thread that serves metrics collection
@@ -170,7 +70,7 @@ pub async fn collect_metrics(
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<ConsumptionMetricsKey, u64> = HashMap::new();
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
loop {
tokio::select! {
@@ -179,7 +79,7 @@ pub async fn collect_metrics(
return Ok(());
},
_ = ticker.tick() => {
if let Err(err) = collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await
if let Err(err) = collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await
{
error!("metrics collection failed: {err:?}");
}
@@ -192,15 +92,20 @@ pub async fn collect_metrics(
///
/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
/// Cache metrics to avoid sending the same metrics multiple times.
pub async fn collect_metrics_task(
///
/// TODO
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
/// - improve error handling. Now if one tenant fails to collect metrics,
/// the whole iteration fails and metrics for other tenants are not collected.
pub async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<ConsumptionMetricsKey, u64>,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
) -> anyhow::Result<()> {
let mut current_metrics: Vec<(ConsumptionMetricsKey, u64)> = Vec::new();
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
trace!(
"starting collect_metrics_task. metric_collection_endpoint: {}",
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
@@ -224,10 +129,10 @@ pub async fn collect_metrics_task(
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
current_metrics.push((
ConsumptionMetricsKey {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: ConsumptionMetricKind::WrittenSize,
metric: WRITTEN_SIZE,
},
timeline_written_size,
));
@@ -236,10 +141,10 @@ pub async fn collect_metrics_task(
// Only send timeline logical size when it is fully calculated.
if is_exact {
current_metrics.push((
ConsumptionMetricsKey {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: ConsumptionMetricKind::TimelineLogicalSize,
metric: TIMELINE_LOGICAL_SIZE,
},
timeline_logical_size,
));
@@ -257,19 +162,19 @@ pub async fn collect_metrics_task(
);
current_metrics.push((
ConsumptionMetricsKey {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: ConsumptionMetricKind::ResidentSize,
metric: RESIDENT_SIZE,
},
tenant_resident_size,
));
current_metrics.push((
ConsumptionMetricsKey {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: ConsumptionMetricKind::RemoteStorageSize,
metric: REMOTE_STORAGE_SIZE,
},
tenant_remote_size,
));
@@ -278,10 +183,10 @@ pub async fn collect_metrics_task(
// Here we only use cached value, which may lag behind the real latest one
let tenant_synthetic_size = tenant.get_cached_synthetic_size();
current_metrics.push((
ConsumptionMetricsKey {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: ConsumptionMetricKind::SyntheticStorageSize,
metric: SYNTHETIC_STORAGE_SIZE,
},
tenant_synthetic_size,
));
@@ -300,35 +205,29 @@ pub async fn collect_metrics_task(
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
const CHUNK_SIZE: usize = 1000;
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<ConsumptionMetric> = Vec::with_capacity(1000);
let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
for chunk in chunks {
chunk_to_send.clear();
// this code block is needed to convince compiler
// that rng is not reused aroung await point
{
// enrich metrics with timestamp and metric_kind before sending
let mut rng = rand::thread_rng();
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| {
ConsumptionMetric::new_absolute(
curr_key.metric,
curr_key.tenant_id,
curr_key.timeline_id,
*curr_val,
node_id,
&mut rng,
)
}));
}
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event {
kind: EventType::Absolute { time: Utc::now() },
metric: curr_key.metric,
idempotency_key: idempotency_key(node_id.to_string()),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
let chunk_json = serde_json::value::to_raw_value(&EventChunk {
events: &chunk_to_send,
})
.expect("ConsumptionMetric should not fail serialization");
.expect("PageserverConsumptionMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())

View File

@@ -11,6 +11,8 @@ base64.workspace = true
bstr.workspace = true
bytes = {workspace = true, features = ['serde'] }
clap.workspace = true
chrono.workspace = true
consumption_metrics.workspace = true
futures.workspace = true
git-version.workspace = true
hashbrown.workspace = true
@@ -48,6 +50,9 @@ x509-parser.workspace = true
metrics.workspace = true
pq_proto.workspace = true
utils.workspace = true
prometheus.workspace = true
humantime.workspace = true
hostname.workspace = true
workspace_hack.workspace = true

View File

@@ -5,6 +5,12 @@ use std::sync::Arc;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection_config: Option<MetricCollectionConfig>,
}
pub struct MetricCollectionConfig {
pub endpoint: reqwest::Url,
pub interval: std::time::Duration,
}
pub struct TlsConfig {

View File

@@ -11,6 +11,7 @@ mod config;
mod console;
mod error;
mod http;
mod metrics;
mod mgmt;
mod parse;
mod proxy;
@@ -20,14 +21,14 @@ mod stream;
mod url;
mod waiters;
use ::metrics::set_build_info_metric;
use anyhow::{bail, Context};
use clap::{self, Arg};
use config::ProxyConfig;
use futures::FutureExt;
use metrics::set_build_info_metric;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tracing::info;
use tracing::{info, info_span, Instrument};
use utils::project_git_version;
use utils::sentry_init::{init_sentry, release_name};
@@ -65,6 +66,22 @@ async fn main() -> anyhow::Result<()> {
let mgmt_address: SocketAddr = arg_matches.get_one::<String>("mgmt").unwrap().parse()?;
let http_address: SocketAddr = arg_matches.get_one::<String>("http").unwrap().parse()?;
let metric_collection_config = match
(
arg_matches.get_one::<String>("metric-collection-endpoint"),
arg_matches.get_one::<String>("metric-collection-interval"),
) {
(Some(endpoint), Some(interval)) => {
Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
interval: humantime::parse_duration(interval)?,
})
}
(None, None) => None,
_ => bail!("either both or neither metric-collection-endpoint and metric-collection-interval must be specified"),
};
let auth_backend = match arg_matches
.get_one::<String>("auth-backend")
.unwrap()
@@ -95,6 +112,7 @@ async fn main() -> anyhow::Result<()> {
let config: &ProxyConfig = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection_config,
}));
info!("Version: {GIT_VERSION}");
@@ -126,6 +144,21 @@ async fn main() -> anyhow::Result<()> {
)));
}
if let Some(metric_collection_config) = &config.metric_collection_config {
let hostname = hostname::get()?
.into_string()
.map_err(|e| anyhow::anyhow!("failed to get hostname {e:?}"))?;
tasks.push(tokio::spawn(
metrics::collect_metrics(
&metric_collection_config.endpoint,
metric_collection_config.interval,
hostname,
)
.instrument(info_span!("collect_metrics")),
));
}
let tasks = tasks.into_iter().map(flatten_err);
set_build_info_metric(GIT_VERSION);
@@ -199,6 +232,16 @@ fn cli() -> clap::Command {
.alias("ssl-cert") // backwards compatibility
.help("path to TLS cert for client postgres connections"),
)
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")
.help("metric collection HTTP endpoint"),
)
.arg(
Arg::new("metric-collection-interval")
.long("metric-collection-interval")
.help("metric collection interval"),
)
}
#[test]

196
proxy/src/metrics.rs Normal file
View File

@@ -0,0 +1,196 @@
//!
//! Periodically collect proxy consumption metrics
//! and push them to a HTTP endpoint.
//!
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use serde::Serialize;
use std::{collections::HashMap, time::Duration};
use tracing::{debug, error, log::info, trace};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
///
/// Key that uniquely identifies the object, this metric describes.
/// Currently, endpoint_id is enough, but this may change later,
/// so keep it in a named struct.
///
/// Both the proxy and the ingestion endpoint will live in the same region (or cell)
/// so while the project-id is unique across regions the whole pipeline will work correctly
/// because we enrich the event with project_id in the control-plane endpoint.
///
#[derive(Eq, Hash, PartialEq, Serialize)]
pub struct Ids {
pub endpoint_id: String,
}
pub async fn collect_metrics(
metric_collection_endpoint: &reqwest::Url,
metric_collection_interval: Duration,
hostname: String,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("collect_metrics has shut down");
}
let mut ticker = tokio::time::interval(metric_collection_interval);
info!(
"starting collect_metrics. metric_collection_endpoint: {}",
metric_collection_endpoint
);
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
loop {
tokio::select! {
_ = ticker.tick() => {
match collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, hostname.clone()).await
{
Err(e) => {
error!("Failed to send consumption metrics: {} ", e);
},
Ok(_) => { trace!("collect_metrics_iteration completed successfully") },
}
}
}
}
}
pub fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
let metrics = prometheus::default_registry().gather();
for m in metrics {
if m.get_name() == "proxy_io_bytes_per_client" {
for ms in m.get_metric() {
let direction = ms
.get_label()
.iter()
.find(|l| l.get_name() == "direction")
.unwrap()
.get_value();
// Only collect metric for outbound traffic
if direction == "tx" {
let endpoint_id = ms
.get_label()
.iter()
.find(|l| l.get_name() == "endpoint_id")
.unwrap()
.get_value();
let value = ms.get_counter().get_value() as u64;
debug!("endpoint_id:val - {}: {}", endpoint_id, value);
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
},
(value, Utc::now()),
));
}
}
}
}
current_metrics
}
pub async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
metric_collection_endpoint: &reqwest::Url,
hostname: String,
) -> anyhow::Result<()> {
info!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
let current_metrics = gather_proxy_io_bytes_per_client();
let metrics_to_send: Vec<Event<Ids>> = current_metrics
.iter()
.filter_map(|(curr_key, (curr_val, curr_time))| {
let mut start_time = *curr_time;
let mut value = *curr_val;
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
// Only send metrics updates if the metric has changed
if curr_val - prev_val > 0 {
value = curr_val - prev_val;
start_time = *prev_time;
} else {
return None;
}
};
Some(Event {
kind: EventType::Incremental {
start_time,
stop_time: *curr_time,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname.clone()),
value,
extra: Ids {
endpoint_id: curr_key.endpoint_id.clone(),
},
})
})
.collect();
if metrics_to_send.is_empty() {
trace!("no new metrics to send");
return Ok(());
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: chunk })
.expect("ProxyConsumptionMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.send()
.await;
let res = match res {
Ok(x) => x,
Err(err) => {
error!("failed to send metrics: {:?}", err);
continue;
}
};
if res.status().is_success() {
// update cached metrics after they were sent successfully
for send_metric in chunk {
let stop_time = match send_metric.kind {
EventType::Incremental { stop_time, .. } => stop_time,
_ => unreachable!(),
};
cached_metrics
.entry(Ids {
endpoint_id: send_metric.extra.endpoint_id.clone(),
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 += send_metric.value;
e.1 = stop_time
})
// cache new metric
.or_insert((send_metric.value, stop_time));
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
}
}
Ok(())
}

View File

@@ -22,6 +22,7 @@ from itertools import chain, product
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff # type: ignore
@@ -2323,6 +2324,8 @@ class NeonProxy(PgProtocol):
http_port: int,
mgmt_port: int,
auth_backend: NeonProxy.AuthBackend,
metric_collection_endpoint: Optional[str] = None,
metric_collection_interval: Optional[str] = None,
):
host = "127.0.0.1"
super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port)
@@ -2333,6 +2336,8 @@ class NeonProxy(PgProtocol):
self.proxy_port = proxy_port
self.mgmt_port = mgmt_port
self.auth_backend = auth_backend
self.metric_collection_endpoint = metric_collection_endpoint
self.metric_collection_interval = metric_collection_interval
self._popen: Optional[subprocess.Popen[bytes]] = None
def start(self) -> NeonProxy:
@@ -2344,6 +2349,16 @@ class NeonProxy(PgProtocol):
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
*self.auth_backend.extra_args(),
]
if (
self.metric_collection_endpoint is not None
and self.metric_collection_interval is not None
):
args += [
*["--metric-collection-endpoint", self.metric_collection_endpoint],
*["--metric-collection-interval", self.metric_collection_interval],
]
self._popen = subprocess.Popen(args)
self._wait_until_ready()
return self
@@ -2357,6 +2372,25 @@ class NeonProxy(PgProtocol):
request_result.raise_for_status()
return request_result.text
@staticmethod
def get_session_id(uri_prefix, uri_line):
assert uri_prefix in uri_line
url_parts = urlparse(uri_line)
psql_session_id = url_parts.path[1:]
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars"
return psql_session_id
@staticmethod
async def find_auth_link(link_auth_uri, proc):
for _ in range(100):
line = (await proc.stderr.readline()).decode("utf-8").strip()
log.info(f"psql line: {line}")
if link_auth_uri in line:
log.info(f"SUCCESS, found auth url: {line}")
return line
def __enter__(self) -> NeonProxy:
return self
@@ -2371,6 +2405,46 @@ class NeonProxy(PgProtocol):
# it's a child process. This is mostly to clean up in between different tests.
self._popen.kill()
@staticmethod
async def activate_link_auth(
local_vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=True
):
pg_user = "proxy"
if create_user:
log.info("creating a new user for link auth test")
local_vanilla_pg.start()
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
db_info = json.dumps(
{
"session_id": psql_session_id,
"result": {
"Success": {
"host": local_vanilla_pg.default_options["host"],
"port": local_vanilla_pg.default_options["port"],
"dbname": local_vanilla_pg.default_options["dbname"],
"user": pg_user,
"aux": {
"project_id": "test_project_id",
"endpoint_id": "test_endpoint_id",
"branch_id": "test_branch_id",
},
}
},
}
)
log.info("sending session activation message")
psql = await PSQL(
host=proxy_with_metric_collector.host,
port=proxy_with_metric_collector.mgmt_port,
).run(db_info)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "ok"
@pytest.fixture(scope="function")
def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]:

View File

@@ -1,12 +1,22 @@
#
# Test for collecting metrics from pageserver and proxy.
# Use mock HTTP server to receive metrics and verify that they look sane.
#
import time
from pathlib import Path
from typing import Iterator
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
PSQL,
NeonEnvBuilder,
NeonProxy,
PortDistributor,
RemoteStorageKind,
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.types import TenantId, TimelineId
@@ -22,6 +32,10 @@ def httpserver_listen_address(port_distributor: PortDistributor):
return ("localhost", port)
# ==============================================================================
# Storage metrics tests
# ==============================================================================
initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
@@ -161,3 +175,102 @@ def test_metric_collection(
assert len(metric_kinds_checked) == len(
checks
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
# ==============================================================================
# Proxy metrics tests
# ==============================================================================
def proxy_metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
# perform basic sanity checks
for event in events:
assert event["metric"] == "proxy_io_bytes_per_client"
assert event["endpoint_id"] == "test_endpoint_id"
assert event["value"] >= 0
assert event["stop_time"] >= event["start_time"]
return Response(status=200)
@pytest.fixture(scope="session")
def proxy_with_metric_collector(
port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth and has metric collection enabled."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_collection_interval = "5s"
with NeonProxy(
neon_binpath=neon_binpath,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
proxy_metrics_handler
)
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run(
"create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42"
)
base_uri = proxy_with_metric_collector.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42")
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(
vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False
)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
httpserver.check()

View File

@@ -1,9 +1,5 @@
import json
from urllib.parse import urlparse
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
@@ -30,62 +26,14 @@ def test_password_hack(static_proxy: NeonProxy):
@pytest.mark.asyncio
async def test_psql_session_id(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
def get_session_id(uri_prefix, uri_line):
assert uri_prefix in uri_line
url_parts = urlparse(uri_line)
psql_session_id = url_parts.path[1:]
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars"
return psql_session_id
async def find_auth_link(link_auth_uri, proc):
for _ in range(100):
line = (await proc.stderr.readline()).decode("utf-8").strip()
log.info(f"psql line: {line}")
if link_auth_uri in line:
log.info(f"SUCCESS, found auth url: {line}")
return line
async def activate_link_auth(local_vanilla_pg, link_proxy, psql_session_id):
pg_user = "proxy"
log.info("creating a new user for link auth test")
local_vanilla_pg.start()
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
db_info = json.dumps(
{
"session_id": psql_session_id,
"result": {
"Success": {
"host": local_vanilla_pg.default_options["host"],
"port": local_vanilla_pg.default_options["port"],
"dbname": local_vanilla_pg.default_options["dbname"],
"user": pg_user,
"aux": {
"project_id": "project",
"endpoint_id": "endpoint",
"branch_id": "branch",
},
}
},
}
)
log.info("sending session activation message")
psql = await PSQL(host=link_proxy.host, port=link_proxy.mgmt_port).run(db_info)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "ok"
psql = await PSQL(host=link_proxy.host, port=link_proxy.proxy_port).run("select 42")
base_uri = link_proxy.link_auth_uri
link = await find_auth_link(base_uri, psql)
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = get_session_id(base_uri, link)
await activate_link_auth(vanilla_pg, link_proxy, psql_session_id)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, link_proxy, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()