Implement consumption metrics collection.

Add new background job to collect billing metrics for each tenant and
send them to the HTTP endpoint.
Metrics are cached, so we don't send non-changed metrics.

Add metric collection config parameters:
metric_collection_endpoint (default None, i.e. disabled)
metric_collection_interval (default 60s)

Add test_metric_collection.py to test metric collection
and sending to the mocked HTTP endpoint.

Use port distributor in metric_collection test

review fixes: only update cache after metrics were send successfully, simplify code

disable metric collection if metric_collection_endpoint is not provided in config
This commit is contained in:
Anastasia Lubennikova
2022-11-29 20:50:58 +02:00
parent 43fd89eaa7
commit 4235f97c6a
13 changed files with 557 additions and 12 deletions

3
Cargo.lock generated
View File

@@ -2415,6 +2415,7 @@ dependencies = [
"rand",
"regex",
"remote_storage",
"reqwest",
"rstar",
"scopeguard",
"serde",
@@ -4753,6 +4754,7 @@ dependencies = [
"ahash",
"anyhow",
"bytes",
"chrono",
"clap 4.0.29",
"crossbeam-utils",
"either",
@@ -4776,6 +4778,7 @@ dependencies = [
"reqwest",
"scopeguard",
"serde",
"serde_json",
"socket2",
"stable_deref_trait",
"syn",

View File

@@ -18,7 +18,7 @@ async-stream = "0.3"
async-trait = "0.1"
byteorder = "1.4.3"
bytes = "1.0.1"
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
chrono = { version = "0.4.23", default-features = false, features = ["clock", "serde"] }
clap = { version = "4.0", features = ["string"] }
close_fds = "0.3.2"
const_format = "0.2.21"
@@ -45,7 +45,7 @@ regex = "1.4.5"
rstar = "0.9.3"
scopeguard = "1.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_json = { version = "1.0", features = ["raw_value"] }
serde_with = "2.0"
signal-hook = "0.3.10"
svg_fmt = "0.4.1"
@@ -69,6 +69,7 @@ storage_broker = { version = "0.1", path = "../storage_broker" }
tenant_size_model = { path = "../libs/tenant_size_model" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
reqwest = "0.11.13"
[dev-dependencies]
criterion = "0.4"

View File

@@ -0,0 +1,283 @@
//!
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use anyhow;
use tracing::*;
use utils::id::TimelineId;
use crate::task_mgr;
use crate::tenant_mgr;
use pageserver_api::models::TenantState;
use utils::id::TenantId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use reqwest::Url;
/// BillingMetric struct that defines the format for one metric entry
/// i.e.
///
/// ```json
/// {
/// "metric": "remote_storage_size",
/// "type": "absolute",
/// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d",
/// "timeline_id": "00000000000000000000000000000000",
/// "time": ...,
/// "value": 12345454,
/// }
/// ```
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct BillingMetric {
pub metric: BillingMetricKind,
pub metric_type: &'static str,
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub time: DateTime<Utc>,
pub value: u64,
}
impl BillingMetric {
pub fn new_absolute(
metric: BillingMetricKind,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
value: u64,
) -> Self {
Self {
metric,
metric_type: "absolute",
tenant_id,
timeline_id,
time: Utc::now(),
value,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum BillingMetricKind {
/// Amount of WAL produced , by a timeline, i.e. last_record_lsn
/// This is an absolute, per-timeline metric.
WrittenSize,
/// Size of all tenant branches including WAL
/// This is an absolute, per-tenant metric.
/// This is the same metric that tenant/tenant_id/size endpoint returns.
SyntheticStorageSize,
/// Size of all the files in the tenant's directory on disk on the pageserver.
/// This is an absolute, per-tenant metric.
/// See also prometheus metric CURRENT_PHYSICAL_SIZE.
PhysicalSize,
/// Size of the remote storage (S3) directory.
/// This is an absolute, per-tenant metric.
RemoteStorageSize,
}
impl FromStr for BillingMetricKind {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"written_size" => Ok(Self::WrittenSize),
"synthetic_storage_size" => Ok(Self::SyntheticStorageSize),
"physical_size" => Ok(Self::PhysicalSize),
"remote_storage_size" => Ok(Self::RemoteStorageSize),
_ => anyhow::bail!("invalid value \"{s}\" for metric type"),
}
}
}
impl fmt::Display for BillingMetricKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
BillingMetricKind::WrittenSize => "written_size",
BillingMetricKind::SyntheticStorageSize => "synthetic_storage_size",
BillingMetricKind::PhysicalSize => "physical_size",
BillingMetricKind::RemoteStorageSize => "remote_storage_size",
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BillingMetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: BillingMetricKind,
}
#[derive(serde::Serialize)]
struct EventChunk<'a> {
events: &'a [BillingMetric],
}
/// Main thread that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<BillingMetricsKey, u64> = HashMap::new();
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("collect_metrics received cancellation request");
return Ok(());
},
_ = ticker.tick() => {
collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint).await?;
}
}
}
}
/// One iteration of metrics collection
///
/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
/// Cache metrics to avoid sending the same metrics multiple times.
pub async fn collect_metrics_task(
client: &reqwest::Client,
cached_metrics: &mut HashMap<BillingMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
) -> anyhow::Result<()> {
let mut current_metrics: Vec<(BillingMetricsKey, u64)> = Vec::new();
trace!(
"starting collect_metrics_task. metric_collection_endpoint: {}",
metric_collection_endpoint
);
// get list of tenants
let tenants = tenant_mgr::list_tenants().await;
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
let tenant = tenant_mgr::get_tenant(tenant_id, true).await?;
let mut tenant_physical_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines().iter() {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
current_metrics.push((
BillingMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: BillingMetricKind::WrittenSize,
},
timeline_written_size,
));
let timeline_size = timeline.get_physical_size();
tenant_physical_size += timeline_size;
debug!(
"per-timeline current metrics for tenant: {}: timeline {} physical_size={} last_record_lsn {} (as bytes)",
tenant_id, timeline.timeline_id, timeline_size, timeline_written_size)
}
let tenant_remote_size = tenant.get_remote_size().await?;
debug!(
"collected current metrics for tenant: {}: state={:?} tenant_physical_size={} remote_size={}",
tenant_id, tenant_state, tenant_physical_size, tenant_remote_size
);
current_metrics.push((
BillingMetricsKey {
tenant_id,
timeline_id: None,
metric: BillingMetricKind::PhysicalSize,
},
tenant_physical_size,
));
current_metrics.push((
BillingMetricsKey {
tenant_id,
timeline_id: None,
metric: BillingMetricKind::RemoteStorageSize,
},
tenant_remote_size,
));
// TODO add SyntheticStorageSize metric
}
// Filter metrics
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
Some(val) => val != curr_val,
None => true,
});
if current_metrics.is_empty() {
trace!("no new metrics to send");
return Ok(());
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
const CHUNK_SIZE: usize = 1000;
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<BillingMetric> = Vec::with_capacity(1000);
for chunk in chunks {
chunk_to_send.clear();
// enrich metrics with timestamp and metric_kind before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| {
BillingMetric::new_absolute(
curr_key.metric,
curr_key.tenant_id,
curr_key.timeline_id,
*curr_val,
)
}));
let chunk_json = serde_json::value::to_raw_value(&EventChunk {
events: &chunk_to_send,
})
.expect("BillingMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.send()
.await;
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
}
}
Err(err) => {
error!("failed to send metrics: {:?}", err);
}
}
}
Ok(())
}

View File

@@ -310,6 +310,26 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
Ok(())
},
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
pageserver::billing_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
}
}
// Spawn a task to listen for libpq connections. It will spawn further tasks

View File

@@ -12,6 +12,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
use once_cell::sync::OnceCell;
use reqwest::Url;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;
@@ -55,6 +56,8 @@ pub mod defaults {
pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "60 s";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
///
/// Default built-in configuration file.
///
@@ -78,6 +81,8 @@ pub mod defaults {
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -144,6 +149,10 @@ pub struct PageServerConf {
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
// How often to collect metrics and send them to the metrics endpoint.
pub metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<Url>,
pub test_remote_failures: u64,
}
@@ -224,6 +233,9 @@ struct PageServerConfigBuilder {
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
metric_collection_interval: BuilderValue<Duration>,
metric_collection_endpoint: BuilderValue<Option<Url>>,
test_remote_failures: BuilderValue<u64>,
}
@@ -260,6 +272,11 @@ impl Default for PageServerConfigBuilder {
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
metric_collection_interval: Set(humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default metric collection interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
test_remote_failures: Set(0),
}
@@ -342,6 +359,14 @@ impl PageServerConfigBuilder {
self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
}
pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
}
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
}
pub fn test_remote_failures(&mut self, fail_first: u64) {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
@@ -394,6 +419,12 @@ impl PageServerConfigBuilder {
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?,
metric_collection_interval: self
.metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?,
metric_collection_endpoint: self
.metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
test_remote_failures: self
.test_remote_failures
.ok_or(anyhow!("missing test_remote_failuers"))?,
@@ -568,6 +599,12 @@ impl PageServerConf {
let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?;
ConfigurableSemaphore::new(permits)
}),
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
"metric_collection_endpoint" => {
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
builder.metric_collection_endpoint(Some(endpoint));
},
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
@@ -690,6 +727,8 @@ impl PageServerConf {
broker_keepalive_interval: Duration::from_secs(5000),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
test_remote_failures: 0,
}
}
@@ -821,6 +860,8 @@ max_file_descriptors = 333
initial_superuser_name = 'zzzz'
id = 10
metric_collection_interval = '222 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
log_format = 'json'
"#;
@@ -864,6 +905,10 @@ log_format = 'json'
)?,
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
)?,
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
test_remote_failures: 0,
},
"Correct defaults should be used when no config values are provided"
@@ -909,6 +954,8 @@ log_format = 'json'
broker_keepalive_interval: Duration::from_secs(5),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(222),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
test_remote_failures: 0,
},
"Should be able to parse all basic config values correctly"

View File

@@ -1,5 +1,6 @@
mod auth;
pub mod basebackup;
pub mod billing_metrics;
pub mod config;
pub mod http;
pub mod import_datadir;

View File

@@ -517,6 +517,10 @@ impl RemoteTimelineClient {
self.metrics.remote_physical_size_gauge().set(size);
}
pub fn get_remote_physical_size(&self) -> u64 {
self.metrics.remote_physical_size_gauge().get()
}
//
// Download operations.
//

View File

@@ -203,6 +203,9 @@ pub enum TaskKind {
// task that handles attaching a tenant
Attach,
// task that handhes metrics collection
MetricsCollection,
}
#[derive(Default)]

View File

@@ -700,6 +700,22 @@ impl Tenant {
Ok(())
}
/// get size of all remote timelines
///
/// This function relies on the index_part instead of listing the remote storage
///
pub async fn get_remote_size(&self) -> anyhow::Result<u64> {
let mut size = 0;
for timeline in self.list_timelines().iter() {
if let Some(remote_client) = &timeline.remote_client {
size += remote_client.get_remote_physical_size();
}
}
Ok(size)
}
#[instrument(skip(self, index_part, remote_metadata, remote_storage), fields(timeline_id=%timeline_id))]
async fn load_remote_timeline(
&self,

44
poetry.lock generated
View File

@@ -11,7 +11,7 @@ async-timeout = ">=3.0,<5.0"
psycopg2-binary = ">=2.8.4"
[package.extras]
sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
sa = ["sqlalchemy[postgresql_psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
@@ -80,7 +80,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"]
docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"]
tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"]
tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"]
tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"]
[[package]]
name = "aws-sam-translator"
@@ -569,7 +569,7 @@ optional = false
python-versions = ">=3.6.0"
[package.extras]
unicode-backport = ["unicodedata2"]
unicode_backport = ["unicodedata2"]
[[package]]
name = "click"
@@ -747,9 +747,9 @@ python-versions = ">=3.6.1,<4.0"
[package.extras]
colors = ["colorama (>=0.4.3,<0.5.0)"]
pipfile-deprecated-finder = ["pipreqs", "requirementslib"]
pipfile_deprecated_finder = ["pipreqs", "requirementslib"]
plugins = ["setuptools"]
requirements-deprecated-finder = ["pip-api", "pipreqs"]
requirements_deprecated_finder = ["pip-api", "pipreqs"]
[[package]]
name = "itsdangerous"
@@ -824,7 +824,7 @@ python-versions = ">=2.7"
[package.extras]
docs = ["jaraco.packaging (>=3.2)", "rst.linker (>=1.9)", "sphinx"]
testing = ["ecdsa", "enum34", "feedparser", "jsonlib", "numpy", "pandas", "pymongo", "pytest (>=3.5,!=3.7.3)", "pytest-black-multipy", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-flake8 (<1.1.0)", "pytest-flake8 (>=1.1.1)", "scikit-learn", "sqlalchemy"]
testing-libs = ["simplejson", "ujson", "yajl"]
"testing.libs" = ["simplejson", "ujson", "yajl"]
[[package]]
name = "jsonpointer"
@@ -850,7 +850,7 @@ six = ">=1.11.0"
[package.extras]
format = ["idna", "jsonpointer (>1.13)", "rfc3987", "strict-rfc3339", "webcolors"]
format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"]
format_nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"]
[[package]]
name = "junit-xml"
@@ -1227,6 +1227,17 @@ pytest = ">=6.1.0"
[package.extras]
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
[[package]]
name = "pytest-httpserver"
version = "1.0.6"
description = "pytest-httpserver is a httpserver for pytest"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
[package.dependencies]
Werkzeug = ">=2.0.0"
[[package]]
name = "pytest-lazy-fixture"
version = "0.6.3"
@@ -1350,7 +1361,7 @@ urllib3 = ">=1.21.1,<1.27"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "responses"
@@ -1583,7 +1594,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "98d63eaa73253882440e0fc8cdb305bb536944768c5ba313c25d0ee65f546544"
content-hash = "55aba66810d5b47d25372c740e4d466e1e791c4d0e665c57a611ab8665563689"
[metadata.files]
aiopg = [
@@ -2099,7 +2110,18 @@ py = [
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
]
pyasn1 = [
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
]
pycodestyle = [
@@ -2157,6 +2179,10 @@ pytest-asyncio = [
{file = "pytest-asyncio-0.19.0.tar.gz", hash = "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed"},
{file = "pytest_asyncio-0.19.0-py3-none-any.whl", hash = "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa"},
]
pytest-httpserver = [
{file = "pytest_httpserver-1.0.6-py3-none-any.whl", hash = "sha256:ac2379acc91fe8bdbe2911c93af8dd130e33b5899fb9934d15669480739c6d32"},
{file = "pytest_httpserver-1.0.6.tar.gz", hash = "sha256:9040d07bf59ac45d8de3db1d4468fd2d1d607975e4da4c872ecc0402cdbf7b3e"},
]
pytest-lazy-fixture = [
{file = "pytest-lazy-fixture-0.6.3.tar.gz", hash = "sha256:0e7d0c7f74ba33e6e80905e9bfd81f9d15ef9a790de97993e34213deb5ad10ac"},
{file = "pytest_lazy_fixture-0.6.3-py3-none-any.whl", hash = "sha256:e0b379f38299ff27a653f03eaa69b08a6fd4484e46fd1c9907d984b9f9daeda6"},

View File

@@ -32,6 +32,7 @@ toml = "^0.10.2"
psutil = "^5.9.4"
types-psutil = "^5.9.5.4"
types-toml = "^0.10.8"
pytest-httpserver = "^1.0.6"
[tool.poetry.dev-dependencies]
flake8 = "^5.0.4"

View File

@@ -0,0 +1,138 @@
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PortDistributor,
RemoteStorageKind,
wait_for_last_flush_lsn,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import query_scalar
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)
num_metrics_received = 0
remote_uploaded = 0
#
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
checks = {
"written_size": lambda value: value > 0,
"physical_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
}
for event in events:
assert checks.pop(event["metric"])(event["value"]), f"{event['metric']} isn't valid"
assert not checks, f"{' '.join(checks.keys())} wasn't/weren't received"
global num_metrics_received
num_metrics_received += 1
return Response(status=200)
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = (
f"""
metric_collection_endpoint="{metric_collection_endpoint}"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_metric_collection",
)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_metric_collection")
pg = env.postgres.create_start("test_metric_collection")
pg_conn = pg.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
global remote_uploaded
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# check that all requests are served
httpserver.check()
global num_metrics_received
assert num_metrics_received > 0, "no metrics were received"

View File

@@ -16,6 +16,7 @@ publish = false
ahash = { version = "0.7", features = ["std"] }
anyhow = { version = "1", features = ["backtrace", "std"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "iana-time-zone", "serde", "std", "winapi"] }
clap = { version = "4", features = ["color", "derive", "error-context", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
@@ -36,9 +37,10 @@ prost = { version = "0.11", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-native-tls", "tokio-rustls", "webpki-roots"] }
reqwest = { version = "0.11", features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-native-tls", "tokio-rustls", "webpki-roots"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
serde_json = { version = "1", features = ["raw_value", "std"] }
socket2 = { version = "0.4", default-features = false, features = ["all"] }
stable_deref_trait = { version = "1", features = ["alloc", "std"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }