From 4235f97c6a3276d90a1c3630fa78d4b1495df260 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Tue, 29 Nov 2022 20:50:58 +0200 Subject: [PATCH] Implement consumption metrics collection. Add new background job to collect billing metrics for each tenant and send them to the HTTP endpoint. Metrics are cached, so we don't send non-changed metrics. Add metric collection config parameters: metric_collection_endpoint (default None, i.e. disabled) metric_collection_interval (default 60s) Add test_metric_collection.py to test metric collection and sending to the mocked HTTP endpoint. Use port distributor in metric_collection test review fixes: only update cache after metrics were send successfully, simplify code disable metric collection if metric_collection_endpoint is not provided in config --- Cargo.lock | 3 + pageserver/Cargo.toml | 5 +- pageserver/src/billing_metrics.rs | 283 ++++++++++++++++++ pageserver/src/bin/pageserver.rs | 20 ++ pageserver/src/config.rs | 47 +++ pageserver/src/lib.rs | 1 + pageserver/src/storage_sync2.rs | 4 + pageserver/src/task_mgr.rs | 3 + pageserver/src/tenant.rs | 16 + poetry.lock | 44 ++- pyproject.toml | 1 + test_runner/regress/test_metric_collection.py | 138 +++++++++ workspace_hack/Cargo.toml | 4 +- 13 files changed, 557 insertions(+), 12 deletions(-) create mode 100644 pageserver/src/billing_metrics.rs create mode 100644 test_runner/regress/test_metric_collection.py diff --git a/Cargo.lock b/Cargo.lock index 665000746d..2737a4d934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2415,6 +2415,7 @@ dependencies = [ "rand", "regex", "remote_storage", + "reqwest", "rstar", "scopeguard", "serde", @@ -4753,6 +4754,7 @@ dependencies = [ "ahash", "anyhow", "bytes", + "chrono", "clap 4.0.29", "crossbeam-utils", "either", @@ -4776,6 +4778,7 @@ dependencies = [ "reqwest", "scopeguard", "serde", + "serde_json", "socket2", "stable_deref_trait", "syn", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 24642ca2f7..f5acfcbdc0 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -18,7 +18,7 @@ async-stream = "0.3" async-trait = "0.1" byteorder = "1.4.3" bytes = "1.0.1" -chrono = { version = "0.4.23", default-features = false, features = ["clock"] } +chrono = { version = "0.4.23", default-features = false, features = ["clock", "serde"] } clap = { version = "4.0", features = ["string"] } close_fds = "0.3.2" const_format = "0.2.21" @@ -45,7 +45,7 @@ regex = "1.4.5" rstar = "0.9.3" scopeguard = "1.1.0" serde = { version = "1.0", features = ["derive"] } -serde_json = "1" +serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "2.0" signal-hook = "0.3.10" svg_fmt = "0.4.1" @@ -69,6 +69,7 @@ storage_broker = { version = "0.1", path = "../storage_broker" } tenant_size_model = { path = "../libs/tenant_size_model" } utils = { path = "../libs/utils" } workspace_hack = { version = "0.1", path = "../workspace_hack" } +reqwest = "0.11.13" [dev-dependencies] criterion = "0.4" diff --git a/pageserver/src/billing_metrics.rs b/pageserver/src/billing_metrics.rs new file mode 100644 index 0000000000..c5da54b8fc --- /dev/null +++ b/pageserver/src/billing_metrics.rs @@ -0,0 +1,283 @@ +//! +//! Periodically collect consumption metrics for all active tenants +//! and push them to a HTTP endpoint. +//! Cache metrics to send only the updated ones. +//! + +use anyhow; +use tracing::*; +use utils::id::TimelineId; + +use crate::task_mgr; +use crate::tenant_mgr; +use pageserver_api::models::TenantState; +use utils::id::TenantId; + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt; +use std::str::FromStr; +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use reqwest::Url; + +/// BillingMetric struct that defines the format for one metric entry +/// i.e. +/// +/// ```json +/// { +/// "metric": "remote_storage_size", +/// "type": "absolute", +/// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d", +/// "timeline_id": "00000000000000000000000000000000", +/// "time": ..., +/// "value": 12345454, +/// } +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub struct BillingMetric { + pub metric: BillingMetricKind, + pub metric_type: &'static str, + pub tenant_id: TenantId, + pub timeline_id: Option, + pub time: DateTime, + pub value: u64, +} + +impl BillingMetric { + pub fn new_absolute( + metric: BillingMetricKind, + tenant_id: TenantId, + timeline_id: Option, + value: u64, + ) -> Self { + Self { + metric, + metric_type: "absolute", + tenant_id, + timeline_id, + time: Utc::now(), + value, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum BillingMetricKind { + /// Amount of WAL produced , by a timeline, i.e. last_record_lsn + /// This is an absolute, per-timeline metric. + WrittenSize, + /// Size of all tenant branches including WAL + /// This is an absolute, per-tenant metric. + /// This is the same metric that tenant/tenant_id/size endpoint returns. + SyntheticStorageSize, + /// Size of all the files in the tenant's directory on disk on the pageserver. + /// This is an absolute, per-tenant metric. + /// See also prometheus metric CURRENT_PHYSICAL_SIZE. + PhysicalSize, + /// Size of the remote storage (S3) directory. + /// This is an absolute, per-tenant metric. + RemoteStorageSize, +} + +impl FromStr for BillingMetricKind { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "written_size" => Ok(Self::WrittenSize), + "synthetic_storage_size" => Ok(Self::SyntheticStorageSize), + "physical_size" => Ok(Self::PhysicalSize), + "remote_storage_size" => Ok(Self::RemoteStorageSize), + _ => anyhow::bail!("invalid value \"{s}\" for metric type"), + } + } +} + +impl fmt::Display for BillingMetricKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + BillingMetricKind::WrittenSize => "written_size", + BillingMetricKind::SyntheticStorageSize => "synthetic_storage_size", + BillingMetricKind::PhysicalSize => "physical_size", + BillingMetricKind::RemoteStorageSize => "remote_storage_size", + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BillingMetricsKey { + tenant_id: TenantId, + timeline_id: Option, + metric: BillingMetricKind, +} + +#[derive(serde::Serialize)] +struct EventChunk<'a> { + events: &'a [BillingMetric], +} + +/// Main thread that serves metrics collection +pub async fn collect_metrics( + metric_collection_endpoint: &Url, + metric_collection_interval: Duration, +) -> anyhow::Result<()> { + let mut ticker = tokio::time::interval(metric_collection_interval); + + info!("starting collect_metrics"); + + // define client here to reuse it for all requests + let client = reqwest::Client::new(); + let mut cached_metrics: HashMap = HashMap::new(); + + loop { + tokio::select! { + _ = task_mgr::shutdown_watcher() => { + info!("collect_metrics received cancellation request"); + return Ok(()); + }, + _ = ticker.tick() => { + collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint).await?; + } + } + } +} + +/// One iteration of metrics collection +/// +/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`. +/// Cache metrics to avoid sending the same metrics multiple times. +pub async fn collect_metrics_task( + client: &reqwest::Client, + cached_metrics: &mut HashMap, + metric_collection_endpoint: &reqwest::Url, +) -> anyhow::Result<()> { + let mut current_metrics: Vec<(BillingMetricsKey, u64)> = Vec::new(); + trace!( + "starting collect_metrics_task. metric_collection_endpoint: {}", + metric_collection_endpoint + ); + + // get list of tenants + let tenants = tenant_mgr::list_tenants().await; + + // iterate through list of Active tenants and collect metrics + for (tenant_id, tenant_state) in tenants { + if tenant_state != TenantState::Active { + continue; + } + + let tenant = tenant_mgr::get_tenant(tenant_id, true).await?; + + let mut tenant_physical_size = 0; + + // iterate through list of timelines in tenant + for timeline in tenant.list_timelines().iter() { + let timeline_written_size = u64::from(timeline.get_last_record_lsn()); + + current_metrics.push(( + BillingMetricsKey { + tenant_id, + timeline_id: Some(timeline.timeline_id), + metric: BillingMetricKind::WrittenSize, + }, + timeline_written_size, + )); + + let timeline_size = timeline.get_physical_size(); + tenant_physical_size += timeline_size; + + debug!( + "per-timeline current metrics for tenant: {}: timeline {} physical_size={} last_record_lsn {} (as bytes)", + tenant_id, timeline.timeline_id, timeline_size, timeline_written_size) + } + + let tenant_remote_size = tenant.get_remote_size().await?; + debug!( + "collected current metrics for tenant: {}: state={:?} tenant_physical_size={} remote_size={}", + tenant_id, tenant_state, tenant_physical_size, tenant_remote_size + ); + + current_metrics.push(( + BillingMetricsKey { + tenant_id, + timeline_id: None, + metric: BillingMetricKind::PhysicalSize, + }, + tenant_physical_size, + )); + + current_metrics.push(( + BillingMetricsKey { + tenant_id, + timeline_id: None, + metric: BillingMetricKind::RemoteStorageSize, + }, + tenant_remote_size, + )); + + // TODO add SyntheticStorageSize metric + } + + // Filter metrics + current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) { + Some(val) => val != curr_val, + None => true, + }); + + if current_metrics.is_empty() { + trace!("no new metrics to send"); + return Ok(()); + } + + // Send metrics. + // Split into chunks of 1000 metrics to avoid exceeding the max request size + const CHUNK_SIZE: usize = 1000; + let chunks = current_metrics.chunks(CHUNK_SIZE); + + let mut chunk_to_send: Vec = Vec::with_capacity(1000); + + for chunk in chunks { + chunk_to_send.clear(); + // enrich metrics with timestamp and metric_kind before sending + chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| { + BillingMetric::new_absolute( + curr_key.metric, + curr_key.tenant_id, + curr_key.timeline_id, + *curr_val, + ) + })); + + let chunk_json = serde_json::value::to_raw_value(&EventChunk { + events: &chunk_to_send, + }) + .expect("BillingMetric should not fail serialization"); + + let res = client + .post(metric_collection_endpoint.clone()) + .json(&chunk_json) + .send() + .await; + + match res { + Ok(res) => { + if res.status().is_success() { + // update cached metrics after they were sent successfully + for (curr_key, curr_val) in chunk.iter() { + cached_metrics.insert(curr_key.clone(), *curr_val); + } + } else { + error!("metrics endpoint refused the sent metrics: {:?}", res); + } + } + Err(err) => { + error!("failed to send metrics: {:?}", err); + } + } + } + + Ok(()) +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 86ce318d0a..cc403ec2ea 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -310,6 +310,26 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { Ok(()) }, ); + + if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { + task_mgr::spawn( + MGMT_REQUEST_RUNTIME.handle(), + TaskKind::MetricsCollection, + None, + None, + "consumption metrics collection", + true, + async move { + pageserver::billing_metrics::collect_metrics( + metric_collection_endpoint, + conf.metric_collection_interval, + ) + .instrument(info_span!("metrics_collection")) + .await?; + Ok(()) + }, + ); + } } // Spawn a task to listen for libpq connections. It will spawn further tasks diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 93c221e622..c6f417390f 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -12,6 +12,7 @@ use utils::crashsafe::path_with_suffix_extension; use utils::id::ConnectionId; use once_cell::sync::OnceCell; +use reqwest::Url; use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -55,6 +56,8 @@ pub mod defaults { pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = super::ConfigurableSemaphore::DEFAULT_INITIAL.get(); + pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "60 s"; + pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option = None; /// /// Default built-in configuration file. /// @@ -78,6 +81,8 @@ pub mod defaults { #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' +#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}' + # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -144,6 +149,10 @@ pub struct PageServerConf { /// Number of concurrent [`Tenant::gather_size_inputs`] allowed. pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore, + // How often to collect metrics and send them to the metrics endpoint. + pub metric_collection_interval: Duration, + pub metric_collection_endpoint: Option, + pub test_remote_failures: u64, } @@ -224,6 +233,9 @@ struct PageServerConfigBuilder { concurrent_tenant_size_logical_size_queries: BuilderValue, + metric_collection_interval: BuilderValue, + metric_collection_endpoint: BuilderValue>, + test_remote_failures: BuilderValue, } @@ -260,6 +272,11 @@ impl Default for PageServerConfigBuilder { log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()), + metric_collection_interval: Set(humantime::parse_duration( + DEFAULT_METRIC_COLLECTION_INTERVAL, + ) + .expect("cannot parse default metric collection interval")), + metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT), test_remote_failures: Set(0), } @@ -342,6 +359,14 @@ impl PageServerConfigBuilder { self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u); } + pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) { + self.metric_collection_interval = BuilderValue::Set(metric_collection_interval) + } + + pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option) { + self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint) + } + pub fn test_remote_failures(&mut self, fail_first: u64) { self.test_remote_failures = BuilderValue::Set(fail_first); } @@ -394,6 +419,12 @@ impl PageServerConfigBuilder { .ok_or(anyhow!( "missing concurrent_tenant_size_logical_size_queries" ))?, + metric_collection_interval: self + .metric_collection_interval + .ok_or(anyhow!("missing metric_collection_interval"))?, + metric_collection_endpoint: self + .metric_collection_endpoint + .ok_or(anyhow!("missing metric_collection_endpoint"))?, test_remote_failures: self .test_remote_failures .ok_or(anyhow!("missing test_remote_failuers"))?, @@ -568,6 +599,12 @@ impl PageServerConf { let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?; ConfigurableSemaphore::new(permits) }), + "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?), + "metric_collection_endpoint" => { + let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?; + builder.metric_collection_endpoint(Some(endpoint)); + }, + "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } @@ -690,6 +727,8 @@ impl PageServerConf { broker_keepalive_interval: Duration::from_secs(5000), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + metric_collection_interval: Duration::from_secs(60), + metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, test_remote_failures: 0, } } @@ -821,6 +860,8 @@ max_file_descriptors = 333 initial_superuser_name = 'zzzz' id = 10 +metric_collection_interval = '222 s' +metric_collection_endpoint = 'http://localhost:80/metrics' log_format = 'json' "#; @@ -864,6 +905,10 @@ log_format = 'json' )?, log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + metric_collection_interval: humantime::parse_duration( + defaults::DEFAULT_METRIC_COLLECTION_INTERVAL + )?, + metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, test_remote_failures: 0, }, "Correct defaults should be used when no config values are provided" @@ -909,6 +954,8 @@ log_format = 'json' broker_keepalive_interval: Duration::from_secs(5), log_format: LogFormat::Json, concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + metric_collection_interval: Duration::from_secs(222), + metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), test_remote_failures: 0, }, "Should be able to parse all basic config values correctly" diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 5c4804db36..626d5e99e3 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,5 +1,6 @@ mod auth; pub mod basebackup; +pub mod billing_metrics; pub mod config; pub mod http; pub mod import_datadir; diff --git a/pageserver/src/storage_sync2.rs b/pageserver/src/storage_sync2.rs index 14ab332eba..9253b250cd 100644 --- a/pageserver/src/storage_sync2.rs +++ b/pageserver/src/storage_sync2.rs @@ -517,6 +517,10 @@ impl RemoteTimelineClient { self.metrics.remote_physical_size_gauge().set(size); } + pub fn get_remote_physical_size(&self) -> u64 { + self.metrics.remote_physical_size_gauge().get() + } + // // Download operations. // diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 91719fb3af..fe3ad1a57d 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -203,6 +203,9 @@ pub enum TaskKind { // task that handles attaching a tenant Attach, + + // task that handhes metrics collection + MetricsCollection, } #[derive(Default)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7a03f52155..0ff5089f66 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -700,6 +700,22 @@ impl Tenant { Ok(()) } + /// get size of all remote timelines + /// + /// This function relies on the index_part instead of listing the remote storage + /// + pub async fn get_remote_size(&self) -> anyhow::Result { + let mut size = 0; + + for timeline in self.list_timelines().iter() { + if let Some(remote_client) = &timeline.remote_client { + size += remote_client.get_remote_physical_size(); + } + } + + Ok(size) + } + #[instrument(skip(self, index_part, remote_metadata, remote_storage), fields(timeline_id=%timeline_id))] async fn load_remote_timeline( &self, diff --git a/poetry.lock b/poetry.lock index 2fa7f03679..f5cbe24954 100644 --- a/poetry.lock +++ b/poetry.lock @@ -11,7 +11,7 @@ async-timeout = ">=3.0,<5.0" psycopg2-binary = ">=2.8.4" [package.extras] -sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"] +sa = ["sqlalchemy[postgresql_psycopg2binary] (>=1.3,<1.5)"] [[package]] name = "allure-pytest" @@ -80,7 +80,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"] docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"] tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"] -tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"] +tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"] [[package]] name = "aws-sam-translator" @@ -569,7 +569,7 @@ optional = false python-versions = ">=3.6.0" [package.extras] -unicode-backport = ["unicodedata2"] +unicode_backport = ["unicodedata2"] [[package]] name = "click" @@ -747,9 +747,9 @@ python-versions = ">=3.6.1,<4.0" [package.extras] colors = ["colorama (>=0.4.3,<0.5.0)"] -pipfile-deprecated-finder = ["pipreqs", "requirementslib"] +pipfile_deprecated_finder = ["pipreqs", "requirementslib"] plugins = ["setuptools"] -requirements-deprecated-finder = ["pip-api", "pipreqs"] +requirements_deprecated_finder = ["pip-api", "pipreqs"] [[package]] name = "itsdangerous" @@ -824,7 +824,7 @@ python-versions = ">=2.7" [package.extras] docs = ["jaraco.packaging (>=3.2)", "rst.linker (>=1.9)", "sphinx"] testing = ["ecdsa", "enum34", "feedparser", "jsonlib", "numpy", "pandas", "pymongo", "pytest (>=3.5,!=3.7.3)", "pytest-black-multipy", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-flake8 (<1.1.0)", "pytest-flake8 (>=1.1.1)", "scikit-learn", "sqlalchemy"] -testing-libs = ["simplejson", "ujson", "yajl"] +"testing.libs" = ["simplejson", "ujson", "yajl"] [[package]] name = "jsonpointer" @@ -850,7 +850,7 @@ six = ">=1.11.0" [package.extras] format = ["idna", "jsonpointer (>1.13)", "rfc3987", "strict-rfc3339", "webcolors"] -format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"] +format_nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"] [[package]] name = "junit-xml" @@ -1227,6 +1227,17 @@ pytest = ">=6.1.0" [package.extras] testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"] +[[package]] +name = "pytest-httpserver" +version = "1.0.6" +description = "pytest-httpserver is a httpserver for pytest" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" + +[package.dependencies] +Werkzeug = ">=2.0.0" + [[package]] name = "pytest-lazy-fixture" version = "0.6.3" @@ -1350,7 +1361,7 @@ urllib3 = ">=1.21.1,<1.27" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] -use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"] [[package]] name = "responses" @@ -1583,7 +1594,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "98d63eaa73253882440e0fc8cdb305bb536944768c5ba313c25d0ee65f546544" +content-hash = "55aba66810d5b47d25372c740e4d466e1e791c4d0e665c57a611ab8665563689" [metadata.files] aiopg = [ @@ -2099,7 +2110,18 @@ py = [ {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, ] pyasn1 = [ + {file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"}, + {file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"}, + {file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"}, + {file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"}, {file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"}, + {file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"}, + {file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"}, + {file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"}, + {file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"}, + {file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"}, + {file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"}, + {file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"}, {file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"}, ] pycodestyle = [ @@ -2157,6 +2179,10 @@ pytest-asyncio = [ {file = "pytest-asyncio-0.19.0.tar.gz", hash = "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed"}, {file = "pytest_asyncio-0.19.0-py3-none-any.whl", hash = "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa"}, ] +pytest-httpserver = [ + {file = "pytest_httpserver-1.0.6-py3-none-any.whl", hash = "sha256:ac2379acc91fe8bdbe2911c93af8dd130e33b5899fb9934d15669480739c6d32"}, + {file = "pytest_httpserver-1.0.6.tar.gz", hash = "sha256:9040d07bf59ac45d8de3db1d4468fd2d1d607975e4da4c872ecc0402cdbf7b3e"}, +] pytest-lazy-fixture = [ {file = "pytest-lazy-fixture-0.6.3.tar.gz", hash = "sha256:0e7d0c7f74ba33e6e80905e9bfd81f9d15ef9a790de97993e34213deb5ad10ac"}, {file = "pytest_lazy_fixture-0.6.3-py3-none-any.whl", hash = "sha256:e0b379f38299ff27a653f03eaa69b08a6fd4484e46fd1c9907d984b9f9daeda6"}, diff --git a/pyproject.toml b/pyproject.toml index b297f7f70b..4819ece4b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ toml = "^0.10.2" psutil = "^5.9.4" types-psutil = "^5.9.5.4" types-toml = "^0.10.8" +pytest-httpserver = "^1.0.6" [tool.poetry.dev-dependencies] flake8 = "^5.0.4" diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py new file mode 100644 index 0000000000..7f86d92962 --- /dev/null +++ b/test_runner/regress/test_metric_collection.py @@ -0,0 +1,138 @@ +import pytest +from fixtures.log_helper import log +from fixtures.metrics import parse_metrics +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PortDistributor, + RemoteStorageKind, + wait_for_last_flush_lsn, +) +from fixtures.types import TenantId, TimelineId +from fixtures.utils import query_scalar +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + + +@pytest.fixture(scope="session") +def httpserver_listen_address(port_distributor: PortDistributor): + port = port_distributor.get_port() + return ("localhost", port) + + +num_metrics_received = 0 +remote_uploaded = 0 + + +# +# verify that metrics look minilally sane +# +def metrics_handler(request: Request) -> Response: + if request.json is None: + return Response(status=400) + + events = request.json["events"] + log.info("received events:") + log.info(events) + + checks = { + "written_size": lambda value: value > 0, + "physical_size": lambda value: value >= 0, + # >= 0 check here is to avoid race condition when we receive metrics before + # remote_uploaded is updated + "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0, + } + + for event in events: + assert checks.pop(event["metric"])(event["value"]), f"{event['metric']} isn't valid" + + assert not checks, f"{' '.join(checks.keys())} wasn't/weren't received" + + global num_metrics_received + num_metrics_received += 1 + return Response(status=200) + + +@pytest.mark.parametrize( + "remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS] +) +def test_metric_collection( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, + remote_storage_kind: RemoteStorageKind, +): + (host, port) = httpserver_listen_address + metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + + # Disable time-based pitr, we will use the manual GC calls + # to trigger remote storage operations in a controlled way + neon_env_builder.pageserver_config_override = ( + f""" + metric_collection_endpoint="{metric_collection_endpoint}" + """ + + "tenant_config={pitr_interval = '0 sec'}" + ) + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_metric_collection", + ) + + log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}") + + # mock http server that returns OK for the metrics + httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( + metrics_handler + ) + + # spin up neon, after http server is ready + env = neon_env_builder.init_start() + env.neon_cli.create_branch("test_metric_collection") + pg = env.postgres.create_start("test_metric_collection") + + pg_conn = pg.connect() + cur = pg_conn.cursor() + + tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id")) + timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id")) + + cur.execute("CREATE TABLE foo (id int, counter int, t text)") + cur.execute( + """ + INSERT INTO foo + SELECT g, 0, 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Helper function that gets the number of given kind of remote ops from the metrics + def get_num_remote_ops(file_kind: str, op_kind: str) -> int: + ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver") + total = 0.0 + for sample in ps_metrics.query_all( + name="pageserver_remote_operation_seconds_count", + filter={ + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "file_kind": str(file_kind), + "op_kind": str(op_kind), + }, + ): + total += sample[2] + return int(total) + + # upload some data to remote storage + if remote_storage_kind == RemoteStorageKind.LOCAL_FS: + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + pageserver_http = env.pageserver.http_client() + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + pageserver_http.timeline_gc(tenant_id, timeline_id, 10000) + global remote_uploaded + remote_uploaded = get_num_remote_ops("index", "upload") + assert remote_uploaded > 0 + + # check that all requests are served + httpserver.check() + global num_metrics_received + assert num_metrics_received > 0, "no metrics were received" diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index de9a26513d..6c81756fe1 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -16,6 +16,7 @@ publish = false ahash = { version = "0.7", features = ["std"] } anyhow = { version = "1", features = ["backtrace", "std"] } bytes = { version = "1", features = ["serde", "std"] } +chrono = { version = "0.4", default-features = false, features = ["clock", "iana-time-zone", "serde", "std", "winapi"] } clap = { version = "4", features = ["color", "derive", "error-context", "help", "std", "string", "suggestions", "usage"] } crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] } either = { version = "1", features = ["use_std"] } @@ -36,9 +37,10 @@ prost = { version = "0.11", features = ["prost-derive", "std"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } -reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-native-tls", "tokio-rustls", "webpki-roots"] } +reqwest = { version = "0.11", features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-native-tls", "tokio-rustls", "webpki-roots"] } scopeguard = { version = "1", features = ["use_std"] } serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] } +serde_json = { version = "1", features = ["raw_value", "std"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } stable_deref_trait = { version = "1", features = ["alloc", "std"] } tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }