Periodically send cached consumption metrics.

Add new `cached_metric_collection_interval` pageserver config setting.
This setting controls how often unchanged cached consumption metrics are sent to
the HTTP endpoint.

This is a workaround for billing service limitations.
This commit is contained in:
Anastasia Lubennikova
2023-02-01 21:32:49 +02:00
parent a0372158a0
commit a69da4a754
3 changed files with 47 additions and 6 deletions

View File

@@ -347,6 +347,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
pageserver::consumption_metrics::collect_metrics( pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint, metric_collection_endpoint,
conf.metric_collection_interval, conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval, conf.synthetic_size_calculation_interval,
conf.id, conf.id,
metrics_ctx, metrics_ctx,

View File

@@ -58,6 +58,7 @@ pub mod defaults {
super::ConfigurableSemaphore::DEFAULT_INITIAL.get(); super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min"; pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None; pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
@@ -85,6 +86,7 @@ pub mod defaults {
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}' #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}' #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
# [tenant_config] # [tenant_config]
@@ -154,6 +156,8 @@ pub struct PageServerConf {
// How often to collect metrics and send them to the metrics endpoint. // How often to collect metrics and send them to the metrics endpoint.
pub metric_collection_interval: Duration, pub metric_collection_interval: Duration,
// How often to send unchanged cached metrics to the metrics endpoint.
pub cached_metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<Url>, pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration, pub synthetic_size_calculation_interval: Duration,
@@ -220,6 +224,7 @@ struct PageServerConfigBuilder {
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>, concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
metric_collection_interval: BuilderValue<Duration>, metric_collection_interval: BuilderValue<Duration>,
cached_metric_collection_interval: BuilderValue<Duration>,
metric_collection_endpoint: BuilderValue<Option<Url>>, metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>, synthetic_size_calculation_interval: BuilderValue<Duration>,
@@ -264,6 +269,10 @@ impl Default for PageServerConfigBuilder {
DEFAULT_METRIC_COLLECTION_INTERVAL, DEFAULT_METRIC_COLLECTION_INTERVAL,
) )
.expect("cannot parse default metric collection interval")), .expect("cannot parse default metric collection interval")),
cached_metric_collection_interval: Set(humantime::parse_duration(
DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default cached_metric_collection_interval")),
synthetic_size_calculation_interval: Set(humantime::parse_duration( synthetic_size_calculation_interval: Set(humantime::parse_duration(
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL, DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
) )
@@ -353,6 +362,14 @@ impl PageServerConfigBuilder {
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval) self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
} }
pub fn cached_metric_collection_interval(
&mut self,
cached_metric_collection_interval: Duration,
) {
self.cached_metric_collection_interval =
BuilderValue::Set(cached_metric_collection_interval)
}
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) { pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint) self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
} }
@@ -427,6 +444,9 @@ impl PageServerConfigBuilder {
metric_collection_interval: self metric_collection_interval: self
.metric_collection_interval .metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?, .ok_or(anyhow!("missing metric_collection_interval"))?,
cached_metric_collection_interval: self
.cached_metric_collection_interval
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
metric_collection_endpoint: self metric_collection_endpoint: self
.metric_collection_endpoint .metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?, .ok_or(anyhow!("missing metric_collection_endpoint"))?,
@@ -612,6 +632,7 @@ impl PageServerConf {
ConfigurableSemaphore::new(permits) ConfigurableSemaphore::new(permits)
}), }),
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?), "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
"metric_collection_endpoint" => { "metric_collection_endpoint" => {
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?; let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
builder.metric_collection_endpoint(Some(endpoint)); builder.metric_collection_endpoint(Some(endpoint));
@@ -741,6 +762,7 @@ impl PageServerConf {
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(60), metric_collection_interval: Duration::from_secs(60),
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60), synthetic_size_calculation_interval: Duration::from_secs(60),
test_remote_failures: 0, test_remote_failures: 0,
@@ -881,6 +903,7 @@ initial_superuser_name = 'zzzz'
id = 10 id = 10
metric_collection_interval = '222 s' metric_collection_interval = '222 s'
cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics' metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s' synthetic_size_calculation_interval = '333 s'
log_format = 'json' log_format = 'json'
@@ -928,6 +951,9 @@ log_format = 'json'
metric_collection_interval: humantime::parse_duration( metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
)?, )?,
cached_metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
)?,
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: humantime::parse_duration( synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
@@ -978,6 +1004,7 @@ log_format = 'json'
log_format: LogFormat::Json, log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(222), metric_collection_interval: Duration::from_secs(222),
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333), synthetic_size_calculation_interval: Duration::from_secs(333),
test_remote_failures: 0, test_remote_failures: 0,

View File

@@ -46,12 +46,14 @@ pub struct PageserverConsumptionMetricsKey {
pub async fn collect_metrics( pub async fn collect_metrics(
metric_collection_endpoint: &Url, metric_collection_endpoint: &Url,
metric_collection_interval: Duration, metric_collection_interval: Duration,
cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration, synthetic_size_calculation_interval: Duration,
node_id: NodeId, node_id: NodeId,
ctx: RequestContext, ctx: RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval); let mut ticker = tokio::time::interval(metric_collection_interval);
let send_cached_every_iter =
cached_metric_collection_interval.as_secs() / metric_collection_interval.as_secs();
info!("starting collect_metrics"); info!("starting collect_metrics");
// spin up background worker that caclulates tenant sizes // spin up background worker that caclulates tenant sizes
@@ -75,6 +77,7 @@ pub async fn collect_metrics(
// define client here to reuse it for all requests // define client here to reuse it for all requests
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new(); let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
let mut iter_num: u64 = 0;
loop { loop {
tokio::select! { tokio::select! {
@@ -83,6 +86,13 @@ pub async fn collect_metrics(
return Ok(()); return Ok(());
}, },
_ = ticker.tick() => { _ = ticker.tick() => {
// send cached metrics every send_cached_every_iter iterations
iter_num += 1;
let send_cached = iter_num >= send_cached_every_iter;
if send_cached {
iter_num = 0;
}
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await; collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
} }
} }
@@ -105,6 +115,7 @@ pub async fn collect_metrics_iteration(
metric_collection_endpoint: &reqwest::Url, metric_collection_endpoint: &reqwest::Url,
node_id: NodeId, node_id: NodeId,
ctx: &RequestContext, ctx: &RequestContext,
send_cached: bool,
) { ) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new(); let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
trace!( trace!(
@@ -222,11 +233,13 @@ pub async fn collect_metrics_iteration(
)); ));
} }
// Filter metrics // Filter metrics, unless we want to send all metrics, including cached ones.
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) { if !send_cached {
Some(val) => val != curr_val, current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
None => true, Some(val) => val != curr_val,
}); None => true,
});
}
if current_metrics.is_empty() { if current_metrics.is_empty() {
trace!("no new metrics to send"); trace!("no new metrics to send");