refactor(consumption_metrics): pre-split cleanup (#5325)

Cleanups in preparation to splitting the consumption_metrics.rs in
#5326.

Split off from #5297.
This commit is contained in:
Joonas Koivunen
2023-09-16 18:08:33 +03:00
committed by GitHub
parent 74d99b5883
commit 9cf4ae86ff

View File

@@ -1,8 +1,5 @@
//!
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
@@ -264,6 +261,14 @@ pub async fn collect_metrics(
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
let cancel = task_mgr::shutdown_token();
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);
let mut cached_metrics = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
ret = restore_and_reschedule => ret,
};
// define client here to reuse it for all requests
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
@@ -271,77 +276,111 @@ pub async fn collect_metrics(
.expect("Failed to create http client with timeout");
let node_id = node_id.to_string();
let cancel = task_mgr::shutdown_token();
let (mut cached_metrics, oldest_metric_captured_at) =
match read_metrics_from_disk(final_path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let oldest_metric_captured_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
let cached = found_some
.into_iter()
.collect::<HashMap<MetricsKey, (EventType, u64)>>();
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
(cached, oldest_metric_captured_at)
}
Err(e) => {
let root = e.root_cause();
// these are point in time, with variable "now"
let metrics = collect_all_metrics(&cached_metrics, &ctx).await;
let maybe_ioerr = root.downcast_ref::<std::io::Error>();
let is_not_found =
maybe_ioerr.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound);
if metrics.is_empty() {
continue;
}
if !is_not_found {
tracing::info!(
"failed to read any previous metrics from {final_path:?}: {e:#}"
);
let metrics = Arc::new(metrics);
// why not race cancellation here? because we are one of the last tasks, and if we are
// already here, better to try to flush the new values.
let flush = async {
match flush_metrics_to_disk(&metrics, &final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
(HashMap::new(), None)
}
};
if let Some(oldest_metric_captured_at) = oldest_metric_captured_at {
// FIXME: chrono methods panic
let oldest_metric_captured_at: SystemTime = oldest_metric_captured_at.into();
let now = SystemTime::now();
let error = match now.duration_since(oldest_metric_captured_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
let deadline = std::time::Instant::now() + sleep_for;
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = tokio::time::sleep_until(deadline.into()) => {},
}
let now = std::time::Instant::now();
// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?oldest_metric_captured_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
oldest_metric_captured_at.duration_since(now).ok()
let upload = async {
let res = upload_metrics(
&client,
metric_collection_endpoint,
&cancel,
&node_id,
&metrics,
&mut cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
};
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
/// Called on the first iteration in an attempt to join the metric uploading schedule from previous
/// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
///
/// Cancellation safe.
async fn restore_and_reschedule(
final_path: &Arc<PathBuf>,
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match read_metrics_from_disk(final_path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let earlier_metric_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();
let cached = found_some.into_iter().collect::<Cache>();
(cached, earlier_metric_at)
}
Err(e) => {
use std::io::{Error, ErrorKind};
let root = e.root_cause();
let maybe_ioerr = root.downcast_ref::<Error>();
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
if !is_not_found {
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
}
(HashMap::new(), None)
}
};
if let Some(earlier_metric_at) = earlier_metric_at {
let earlier_metric_at: SystemTime = earlier_metric_at.into();
let error = reschedule(earlier_metric_at, metric_collection_interval).await;
if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
@@ -352,83 +391,41 @@ pub async fn collect_metrics(
}
}
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
iteration(
&client,
metric_collection_endpoint,
&cancel,
&mut cached_metrics,
&node_id,
&final_path,
&ctx,
)
.await;
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
cached
}
async fn iteration(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
cached_metrics: &mut Cache,
node_id: &str,
final_path: &Arc<PathBuf>,
ctx: &RequestContext,
) {
// these are point in time, with variable "now"
let metrics = collect_all_metrics(cached_metrics, ctx).await;
async fn reschedule(
earlier_metric_at: SystemTime,
metric_collection_interval: Duration,
) -> Option<Duration> {
let now = SystemTime::now();
match now.duration_since(earlier_metric_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
if metrics.is_empty() {
return;
let deadline = std::time::Instant::now() + sleep_for;
tokio::time::sleep_until(deadline.into()).await;
let now = std::time::Instant::now();
// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?earlier_metric_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
earlier_metric_at.duration_since(now).ok()
}
}
let metrics = Arc::new(metrics);
let flush = async {
match flush_metrics_to_disk(&metrics, final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
}
};
let upload = async {
let res = upload_metrics(
client,
metric_collection_endpoint,
cancel,
node_id,
&metrics,
cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
};
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
}
async fn collect_all_metrics(cached_metrics: &Cache, ctx: &RequestContext) -> Vec<RawMetric> {
@@ -929,7 +926,7 @@ async fn calculate_synthetic_size_worker(
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
// reminder: this ticker is ready right away
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;