mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Periodically send cached consumption metrics.
Add new `cached_metric_collection_interval` pageserver config setting. This setting controls how often unchanged cached consumption metrics are sent to the HTTP endpoint. This is a workaround for billing service limitations.
This commit is contained in:
@@ -347,6 +347,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
|||||||
pageserver::consumption_metrics::collect_metrics(
|
pageserver::consumption_metrics::collect_metrics(
|
||||||
metric_collection_endpoint,
|
metric_collection_endpoint,
|
||||||
conf.metric_collection_interval,
|
conf.metric_collection_interval,
|
||||||
|
conf.cached_metric_collection_interval,
|
||||||
conf.synthetic_size_calculation_interval,
|
conf.synthetic_size_calculation_interval,
|
||||||
conf.id,
|
conf.id,
|
||||||
metrics_ctx,
|
metrics_ctx,
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ pub mod defaults {
|
|||||||
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
|
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
|
||||||
|
|
||||||
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
||||||
|
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
|
||||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||||
|
|
||||||
@@ -85,6 +86,7 @@ pub mod defaults {
|
|||||||
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
|
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
|
||||||
|
|
||||||
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
|
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
|
||||||
|
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
|
||||||
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
|
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
|
||||||
|
|
||||||
# [tenant_config]
|
# [tenant_config]
|
||||||
@@ -154,6 +156,8 @@ pub struct PageServerConf {
|
|||||||
|
|
||||||
// How often to collect metrics and send them to the metrics endpoint.
|
// How often to collect metrics and send them to the metrics endpoint.
|
||||||
pub metric_collection_interval: Duration,
|
pub metric_collection_interval: Duration,
|
||||||
|
// How often to send unchanged cached metrics to the metrics endpoint.
|
||||||
|
pub cached_metric_collection_interval: Duration,
|
||||||
pub metric_collection_endpoint: Option<Url>,
|
pub metric_collection_endpoint: Option<Url>,
|
||||||
pub synthetic_size_calculation_interval: Duration,
|
pub synthetic_size_calculation_interval: Duration,
|
||||||
|
|
||||||
@@ -220,6 +224,7 @@ struct PageServerConfigBuilder {
|
|||||||
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
|
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
|
||||||
|
|
||||||
metric_collection_interval: BuilderValue<Duration>,
|
metric_collection_interval: BuilderValue<Duration>,
|
||||||
|
cached_metric_collection_interval: BuilderValue<Duration>,
|
||||||
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
||||||
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
||||||
|
|
||||||
@@ -264,6 +269,10 @@ impl Default for PageServerConfigBuilder {
|
|||||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||||
)
|
)
|
||||||
.expect("cannot parse default metric collection interval")),
|
.expect("cannot parse default metric collection interval")),
|
||||||
|
cached_metric_collection_interval: Set(humantime::parse_duration(
|
||||||
|
DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
|
||||||
|
)
|
||||||
|
.expect("cannot parse default cached_metric_collection_interval")),
|
||||||
synthetic_size_calculation_interval: Set(humantime::parse_duration(
|
synthetic_size_calculation_interval: Set(humantime::parse_duration(
|
||||||
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
|
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
|
||||||
)
|
)
|
||||||
@@ -353,6 +362,14 @@ impl PageServerConfigBuilder {
|
|||||||
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
|
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cached_metric_collection_interval(
|
||||||
|
&mut self,
|
||||||
|
cached_metric_collection_interval: Duration,
|
||||||
|
) {
|
||||||
|
self.cached_metric_collection_interval =
|
||||||
|
BuilderValue::Set(cached_metric_collection_interval)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
|
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
|
||||||
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
|
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
|
||||||
}
|
}
|
||||||
@@ -427,6 +444,9 @@ impl PageServerConfigBuilder {
|
|||||||
metric_collection_interval: self
|
metric_collection_interval: self
|
||||||
.metric_collection_interval
|
.metric_collection_interval
|
||||||
.ok_or(anyhow!("missing metric_collection_interval"))?,
|
.ok_or(anyhow!("missing metric_collection_interval"))?,
|
||||||
|
cached_metric_collection_interval: self
|
||||||
|
.cached_metric_collection_interval
|
||||||
|
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
|
||||||
metric_collection_endpoint: self
|
metric_collection_endpoint: self
|
||||||
.metric_collection_endpoint
|
.metric_collection_endpoint
|
||||||
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
|
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
|
||||||
@@ -612,6 +632,7 @@ impl PageServerConf {
|
|||||||
ConfigurableSemaphore::new(permits)
|
ConfigurableSemaphore::new(permits)
|
||||||
}),
|
}),
|
||||||
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
|
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
|
||||||
|
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
|
||||||
"metric_collection_endpoint" => {
|
"metric_collection_endpoint" => {
|
||||||
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
|
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
|
||||||
builder.metric_collection_endpoint(Some(endpoint));
|
builder.metric_collection_endpoint(Some(endpoint));
|
||||||
@@ -741,6 +762,7 @@ impl PageServerConf {
|
|||||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||||
metric_collection_interval: Duration::from_secs(60),
|
metric_collection_interval: Duration::from_secs(60),
|
||||||
|
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||||
test_remote_failures: 0,
|
test_remote_failures: 0,
|
||||||
@@ -881,6 +903,7 @@ initial_superuser_name = 'zzzz'
|
|||||||
id = 10
|
id = 10
|
||||||
|
|
||||||
metric_collection_interval = '222 s'
|
metric_collection_interval = '222 s'
|
||||||
|
cached_metric_collection_interval = '22200 s'
|
||||||
metric_collection_endpoint = 'http://localhost:80/metrics'
|
metric_collection_endpoint = 'http://localhost:80/metrics'
|
||||||
synthetic_size_calculation_interval = '333 s'
|
synthetic_size_calculation_interval = '333 s'
|
||||||
log_format = 'json'
|
log_format = 'json'
|
||||||
@@ -928,6 +951,9 @@ log_format = 'json'
|
|||||||
metric_collection_interval: humantime::parse_duration(
|
metric_collection_interval: humantime::parse_duration(
|
||||||
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
|
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
|
||||||
)?,
|
)?,
|
||||||
|
cached_metric_collection_interval: humantime::parse_duration(
|
||||||
|
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
|
||||||
|
)?,
|
||||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||||
synthetic_size_calculation_interval: humantime::parse_duration(
|
synthetic_size_calculation_interval: humantime::parse_duration(
|
||||||
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
||||||
@@ -978,6 +1004,7 @@ log_format = 'json'
|
|||||||
log_format: LogFormat::Json,
|
log_format: LogFormat::Json,
|
||||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||||
metric_collection_interval: Duration::from_secs(222),
|
metric_collection_interval: Duration::from_secs(222),
|
||||||
|
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||||
test_remote_failures: 0,
|
test_remote_failures: 0,
|
||||||
|
|||||||
@@ -46,12 +46,14 @@ pub struct PageserverConsumptionMetricsKey {
|
|||||||
pub async fn collect_metrics(
|
pub async fn collect_metrics(
|
||||||
metric_collection_endpoint: &Url,
|
metric_collection_endpoint: &Url,
|
||||||
metric_collection_interval: Duration,
|
metric_collection_interval: Duration,
|
||||||
|
cached_metric_collection_interval: Duration,
|
||||||
synthetic_size_calculation_interval: Duration,
|
synthetic_size_calculation_interval: Duration,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
ctx: RequestContext,
|
ctx: RequestContext,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut ticker = tokio::time::interval(metric_collection_interval);
|
let mut ticker = tokio::time::interval(metric_collection_interval);
|
||||||
|
let send_cached_every_iter =
|
||||||
|
cached_metric_collection_interval.as_secs() / metric_collection_interval.as_secs();
|
||||||
info!("starting collect_metrics");
|
info!("starting collect_metrics");
|
||||||
|
|
||||||
// spin up background worker that caclulates tenant sizes
|
// spin up background worker that caclulates tenant sizes
|
||||||
@@ -75,6 +77,7 @@ pub async fn collect_metrics(
|
|||||||
// define client here to reuse it for all requests
|
// define client here to reuse it for all requests
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
|
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
|
||||||
|
let mut iter_num: u64 = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -83,6 +86,13 @@ pub async fn collect_metrics(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
},
|
},
|
||||||
_ = ticker.tick() => {
|
_ = ticker.tick() => {
|
||||||
|
// send cached metrics every send_cached_every_iter iterations
|
||||||
|
iter_num += 1;
|
||||||
|
let send_cached = iter_num >= send_cached_every_iter;
|
||||||
|
if send_cached {
|
||||||
|
iter_num = 0;
|
||||||
|
}
|
||||||
|
|
||||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
|
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -105,6 +115,7 @@ pub async fn collect_metrics_iteration(
|
|||||||
metric_collection_endpoint: &reqwest::Url,
|
metric_collection_endpoint: &reqwest::Url,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
|
send_cached: bool,
|
||||||
) {
|
) {
|
||||||
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
||||||
trace!(
|
trace!(
|
||||||
@@ -222,11 +233,13 @@ pub async fn collect_metrics_iteration(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter metrics
|
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||||
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
|
if !send_cached {
|
||||||
Some(val) => val != curr_val,
|
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
|
||||||
None => true,
|
Some(val) => val != curr_val,
|
||||||
});
|
None => true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if current_metrics.is_empty() {
|
if current_metrics.is_empty() {
|
||||||
trace!("no new metrics to send");
|
trace!("no new metrics to send");
|
||||||
|
|||||||
Reference in New Issue
Block a user