Periodically send cached consumption metrics.

Add new `cached_metric_collection_interval` pageserver config setting.
This setting controls how often unchanged cached consumption metrics are sent to
the HTTP endpoint.

This is a workaround for billing service limitations.
This commit is contained in:
Anastasia Lubennikova
2023-02-01 21:32:49 +02:00
parent a0372158a0
commit a69da4a754
3 changed files with 47 additions and 6 deletions

View File

@@ -347,6 +347,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
metrics_ctx,

View File

@@ -58,6 +58,7 @@ pub mod defaults {
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
@@ -85,6 +86,7 @@ pub mod defaults {
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
# [tenant_config]
@@ -154,6 +156,8 @@ pub struct PageServerConf {
// How often to collect metrics and send them to the metrics endpoint.
pub metric_collection_interval: Duration,
// How often to send unchanged cached metrics to the metrics endpoint.
pub cached_metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
@@ -220,6 +224,7 @@ struct PageServerConfigBuilder {
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
metric_collection_interval: BuilderValue<Duration>,
cached_metric_collection_interval: BuilderValue<Duration>,
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
@@ -264,6 +269,10 @@ impl Default for PageServerConfigBuilder {
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default metric collection interval")),
cached_metric_collection_interval: Set(humantime::parse_duration(
DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default cached_metric_collection_interval")),
synthetic_size_calculation_interval: Set(humantime::parse_duration(
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
)
@@ -353,6 +362,14 @@ impl PageServerConfigBuilder {
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
}
pub fn cached_metric_collection_interval(
&mut self,
cached_metric_collection_interval: Duration,
) {
self.cached_metric_collection_interval =
BuilderValue::Set(cached_metric_collection_interval)
}
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
}
@@ -427,6 +444,9 @@ impl PageServerConfigBuilder {
metric_collection_interval: self
.metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?,
cached_metric_collection_interval: self
.cached_metric_collection_interval
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
metric_collection_endpoint: self
.metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
@@ -612,6 +632,7 @@ impl PageServerConf {
ConfigurableSemaphore::new(permits)
}),
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
"metric_collection_endpoint" => {
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
builder.metric_collection_endpoint(Some(endpoint));
@@ -741,6 +762,7 @@ impl PageServerConf {
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(60),
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
test_remote_failures: 0,
@@ -881,6 +903,7 @@ initial_superuser_name = 'zzzz'
id = 10
metric_collection_interval = '222 s'
cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
log_format = 'json'
@@ -928,6 +951,9 @@ log_format = 'json'
metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
)?,
cached_metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
)?,
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
@@ -978,6 +1004,7 @@ log_format = 'json'
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(222),
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
test_remote_failures: 0,

View File

@@ -46,12 +46,14 @@ pub struct PageserverConsumptionMetricsKey {
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
ctx: RequestContext,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
let send_cached_every_iter =
cached_metric_collection_interval.as_secs() / metric_collection_interval.as_secs();
info!("starting collect_metrics");
// spin up background worker that caclulates tenant sizes
@@ -75,6 +77,7 @@ pub async fn collect_metrics(
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
let mut iter_num: u64 = 0;
loop {
tokio::select! {
@@ -83,6 +86,13 @@ pub async fn collect_metrics(
return Ok(());
},
_ = ticker.tick() => {
// send cached metrics every send_cached_every_iter iterations
iter_num += 1;
let send_cached = iter_num >= send_cached_every_iter;
if send_cached {
iter_num = 0;
}
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
}
}
@@ -105,6 +115,7 @@ pub async fn collect_metrics_iteration(
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
trace!(
@@ -222,11 +233,13 @@ pub async fn collect_metrics_iteration(
));
}
// Filter metrics
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
Some(val) => val != curr_val,
None => true,
});
// Filter metrics, unless we want to send all metrics, including cached ones.
if !send_cached {
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
Some(val) => val != curr_val,
None => true,
});
}
if current_metrics.is_empty() {
trace!("no new metrics to send");