mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
feat(consumption_metrics): remove event deduplication support (#5316)
We no longer use pageserver deduplication anywhere. Give out a warning instead. Split off from #5297. Cc: #5175 for dedup.
This commit is contained in:
@@ -64,7 +64,7 @@ pub mod defaults {
|
||||
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
|
||||
|
||||
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
|
||||
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
|
||||
|
||||
@@ -221,11 +221,16 @@ type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
metric_collection_interval: Duration,
|
||||
cached_metric_collection_interval: Duration,
|
||||
_cached_metric_collection_interval: Duration,
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
node_id: NodeId,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if _cached_metric_collection_interval != Duration::ZERO {
|
||||
tracing::warn!(
|
||||
"cached_metric_collection_interval is no longer used, please set it to zero."
|
||||
)
|
||||
}
|
||||
let mut ticker = tokio::time::interval(metric_collection_interval);
|
||||
info!("starting collect_metrics");
|
||||
|
||||
@@ -253,7 +258,6 @@ pub async fn collect_metrics(
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
let mut cached_metrics = HashMap::new();
|
||||
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -263,14 +267,7 @@ pub async fn collect_metrics(
|
||||
},
|
||||
tick_at = ticker.tick() => {
|
||||
|
||||
// send cached metrics every cached_metric_collection_interval
|
||||
let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
|
||||
|
||||
if send_cached {
|
||||
prev_iteration_time = std::time::Instant::now();
|
||||
}
|
||||
|
||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
|
||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
|
||||
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
@@ -298,7 +295,6 @@ async fn collect_metrics_iteration(
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
node_id: NodeId,
|
||||
ctx: &RequestContext,
|
||||
send_cached: bool,
|
||||
) {
|
||||
trace!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
@@ -325,24 +321,7 @@ async fn collect_metrics_iteration(
|
||||
}
|
||||
});
|
||||
|
||||
let mut current_metrics = collect(tenants, cached_metrics, ctx).await;
|
||||
|
||||
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||
// See: https://github.com/neondatabase/neon/issues/3485
|
||||
if !send_cached {
|
||||
current_metrics.retain(|(curr_key, (kind, curr_val))| {
|
||||
if kind.is_incremental() {
|
||||
// incremental values (currently only written_size_delta) should not get any cache
|
||||
// deduplication because they will be used by upstream for "is still alive."
|
||||
true
|
||||
} else {
|
||||
match cached_metrics.get(curr_key) {
|
||||
Some((_, val)) => val != curr_val,
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
let current_metrics = collect(tenants, cached_metrics, ctx).await;
|
||||
|
||||
if current_metrics.is_empty() {
|
||||
trace!("no new metrics to send");
|
||||
|
||||
Reference in New Issue
Block a user