remove more metrics, still compiles

This commit is contained in:
BodoBolero
2025-04-17 16:12:06 +02:00
parent ef81d0b81d
commit 9584f65950
8 changed files with 56 additions and 283 deletions

View File

@@ -569,9 +569,7 @@ impl RequestContext {
}
match &self.scope {
Scope::Timeline { arc_arc } => arc_arc
.wait_ondemand_download_time
.observe(self.task_kind, duration),
Scope::Timeline { arc_arc: _ } => {},
_ => {
use once_cell::sync::Lazy;
use std::sync::Mutex;

View File

@@ -17,7 +17,6 @@ use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use postgres_backend::{QueryError, is_expected_io_error};
use pq_proto::framed::ConnectionError;
@@ -31,7 +30,7 @@ use crate::pgdatadir_mapping::DatadirModificationStats;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::storage_layer::{InMemoryLayer, PersistentLayerDesc};
use crate::tenant::storage_layer::PersistentLayerDesc;
use crate::tenant::throttle::ThrottleResult;
/// Prometheus histogram buckets (in seconds) for operations in the critical
@@ -51,8 +50,6 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
#[derive(Debug, VariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum StorageTimeOperation {
#[strum(serialize = "layer flush")]
LayerFlush,
#[strum(serialize = "layer flush delay")]
LayerFlushDelay,
@@ -69,9 +66,6 @@ pub(crate) enum StorageTimeOperation {
#[strum(serialize = "imitate logical size")]
ImitateLogicalSize,
#[strum(serialize = "load layer map")]
LoadLayerMap,
#[strum(serialize = "gc")]
Gc,
@@ -238,20 +232,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
});
pub(crate) mod wait_ondemand_download_time {
use super::*;
pub struct WaitOndemandDownloadTimeSum {
}
impl WaitOndemandDownloadTimeSum {
pub(crate) fn new(_tenant_id: &str, _shard_id: &str, _timeline_id: &str) -> Self {
Self {
}
}
pub(crate) fn observe(&self, _task_kind: TaskKind, _duration: Duration) {
}
}
pub(crate) fn shutdown_timeline(_tenant_id: &str, _shard_id: &str, _timeline_id: &str) {
}
@@ -769,16 +749,6 @@ pub(crate) mod virtual_file_descriptor_cache {
#[cfg(not(test))]
pub(crate) mod virtual_file_io_engine {
use super::*;
pub(crate) static KIND: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_virtual_file_io_engine_kind",
"The configured io engine for VirtualFile",
&["kind"],
)
.unwrap()
});
}
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
@@ -828,9 +798,6 @@ impl SmgrOpTimer {
pub(crate) fn observe_throttle_done(&mut self, _throttle: ThrottleResult) {
}
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_start(&mut self, _at: Instant) {
}
}
@@ -1804,34 +1771,24 @@ pub(crate) struct TimelineMetrics {
tenant_id: String,
shard_id: String,
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_delay_histo: StorageTimeMetrics,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
pub imitate_logical_size_histo: StorageTimeMetrics,
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub disk_consistent_lsn_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub layers_per_read: Histogram,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
pub evictions: IntCounter,
/// Number of valid LSN leases.
pub valid_lsn_lease_count_gauge: UIntGauge,
pub evictions: IntCounter,
pub wal_records_received: IntCounter,
pub storage_io_size: StorageIoSizeMetrics,
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
pub wait_lsn_start_finish_counterpair: IntCounterPair,
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -1843,12 +1800,7 @@ impl TimelineMetrics {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id_raw.to_string();
let flush_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::LayerFlush,
&tenant_id,
&shard_id,
&timeline_id,
);
let flush_delay_histo = StorageTimeMetrics::new(
StorageTimeOperation::LayerFlushDelay,
&tenant_id,
@@ -1879,12 +1831,6 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let load_layer_map_histo = StorageTimeMetrics::new(
StorageTimeOperation::LoadLayerMap,
&tenant_id,
&shard_id,
&timeline_id,
);
let garbage_collect_histo = StorageTimeMetrics::new(
StorageTimeOperation::Gc,
&tenant_id,
@@ -1898,10 +1844,6 @@ impl TimelineMetrics {
&timeline_id,
);
let disk_consistent_lsn_gauge = DISK_CONSISTENT_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let pitr_history_size = PITR_HISTORY_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -1910,16 +1852,9 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let layers_per_read = LAYERS_PER_READ
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let standby_horizon_gauge = STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -1950,36 +1885,16 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let valid_lsn_lease_count_gauge = VALID_LSN_LEASE_COUNT
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let storage_io_size = StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id);
let wait_lsn_in_progress_micros = GlobalAndPerTenantIntCounter {
};
let wait_lsn_start_finish_counterpair = WAIT_LSN_START_FINISH_COUNTERPAIR
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wait_ondemand_download_time =
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
&tenant_id,
&shard_id,
&timeline_id,
);
TimelineMetrics {
tenant_id,
shard_id,
timeline_id,
flush_time_histo,
flush_delay_histo,
compact_time_histo,
create_images_time_histo,
@@ -1987,31 +1902,20 @@ impl TimelineMetrics {
imitate_logical_size_histo,
garbage_collect_histo,
find_gc_cutoffs_histo,
load_layer_map_histo,
disk_consistent_lsn_gauge,
pitr_history_size,
archival_size,
layers_per_read,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
evictions,
storage_io_size,
valid_lsn_lease_count_gauge,
wal_records_received,
wait_lsn_in_progress_micros,
wait_lsn_start_finish_counterpair,
wait_ondemand_download_time,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
pub(crate) fn record_new_file_metrics(&self, sz: u64) {
self.resident_physical_size_add(sz);
}
pub(crate) fn resident_physical_size_sub(&self, _sz: u64) {
// self.resident_physical_size_gauge.sub(sz);
@@ -2028,17 +1932,6 @@ impl TimelineMetrics {
0 // FIXME: Return dummy value as gauge access is commented out
}
/// Removes a frozen ephemeral layer to TIMELINE_LAYER metrics.
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
}
/// Adds a frozen ephemeral layer to TIMELINE_LAYER metrics.
pub fn inc_frozen_layer(&self, _layer: &InMemoryLayer) {
}
/// Removes a persistent layer from TIMELINE_LAYER metrics.
pub fn dec_layer(&self, _layer_desc: &PersistentLayerDesc) {
}
@@ -2643,18 +2536,6 @@ pub mod tokio_epoll_uring {
.unwrap()
});
}
pub(crate) struct GlobalAndPerTenantIntCounter {
}
impl GlobalAndPerTenantIntCounter {
#[inline(always)]
pub(crate) fn inc_by(&self, _n: u64) {
// self.global.inc_by(n);
// self.per_tenant.inc_by(n);
}
}
pub(crate) mod tenant_throttling {
use metrics::register_int_counter_vec;
use once_cell::sync::Lazy;

View File

@@ -637,7 +637,6 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
effective_request_lsn: Lsn,
ctx: RequestContext,
}
@@ -645,7 +644,6 @@ struct BatchedGetPageRequest {
#[cfg(feature = "testing")]
struct BatchedTestRequest {
req: models::PagestreamTestRequest,
timer: SmgrOpTimer,
}
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
@@ -655,13 +653,13 @@ struct BatchedTestRequest {
enum BatchedFeMessage {
Exists {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
@@ -673,13 +671,13 @@ enum BatchedFeMessage {
},
DbSize {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
@@ -700,27 +698,7 @@ impl BatchedFeMessage {
self.into()
}
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
| BatchedFeMessage::Nblocks { timer, .. }
| BatchedFeMessage::DbSize { timer, .. }
| BatchedFeMessage::GetSlruSegment { timer, .. } => {
timer.observe_execution_start(at);
}
BatchedFeMessage::GetPage { pages, .. } => {
for page in pages {
page.timer.observe_execution_start(at);
}
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test { requests, .. } => {
for req in requests {
req.timer.observe_execution_start(at);
}
}
BatchedFeMessage::RespondError { .. } => {}
}
fn observe_execution_start(&mut self, _at: Instant) {
}
fn should_break_batch(
@@ -960,7 +938,7 @@ impl PageServerHandler {
.await?;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
received_at,
@@ -968,7 +946,7 @@ impl PageServerHandler {
.await?;
BatchedFeMessage::Exists {
span,
timer,
shard: shard.downgrade(),
req,
}
@@ -978,7 +956,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
received_at,
@@ -986,7 +964,7 @@ impl PageServerHandler {
.await?;
BatchedFeMessage::Nblocks {
span,
timer,
shard: shard.downgrade(),
req,
}
@@ -996,7 +974,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
received_at,
@@ -1004,7 +982,7 @@ impl PageServerHandler {
.await?;
BatchedFeMessage::DbSize {
span,
timer,
shard: shard.downgrade(),
req,
}
@@ -1014,7 +992,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
received_at,
@@ -1022,7 +1000,7 @@ impl PageServerHandler {
.await?;
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard: shard.downgrade(),
req,
}
@@ -1121,7 +1099,7 @@ impl PageServerHandler {
// request handler log messages contain the request-specific fields.
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
@@ -1154,7 +1132,6 @@ impl PageServerHandler {
shard: shard.downgrade(),
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
effective_request_lsn,
ctx,
}],
@@ -1170,13 +1147,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
BatchedFeMessage::Test {
span,
shard: shard.downgrade(),
requests: vec![BatchedTestRequest { req, timer }],
requests: vec![BatchedTestRequest { req, }],
}
}
};
@@ -1363,7 +1339,7 @@ impl PageServerHandler {
})
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
Ok((response_msg, )) => response_msg,
};
//
@@ -1417,7 +1393,7 @@ impl PageServerHandler {
ctx: &RequestContext,
) -> Result<
(
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
Vec<Result<(PagestreamBeMessage, ), BatchedPageStreamError>>,
Span,
),
QueryError,
@@ -1433,7 +1409,7 @@ impl PageServerHandler {
Ok(match batch {
BatchedFeMessage::Exists {
span,
timer,
shard,
req,
} => {
@@ -1444,7 +1420,7 @@ impl PageServerHandler {
self.handle_get_rel_exists_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, ))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1452,7 +1428,7 @@ impl PageServerHandler {
}
BatchedFeMessage::Nblocks {
span,
timer,
shard,
req,
} => {
@@ -1463,7 +1439,7 @@ impl PageServerHandler {
self.handle_get_nblocks_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, ))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1499,7 +1475,6 @@ impl PageServerHandler {
}
BatchedFeMessage::DbSize {
span,
timer,
shard,
req,
} => {
@@ -1510,7 +1485,7 @@ impl PageServerHandler {
self.handle_db_size_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, ))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1518,7 +1493,6 @@ impl PageServerHandler {
}
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard,
req,
} => {
@@ -1529,7 +1503,7 @@ impl PageServerHandler {
self.handle_get_slru_segment_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, ))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -2177,7 +2151,7 @@ impl PageServerHandler {
io_concurrency: IoConcurrency,
_batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, ), BatchedPageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
// If a page trace is running, submit an event for this request.
@@ -2279,7 +2253,7 @@ impl PageServerHandler {
req: req.req,
page,
}),
req.timer,
)
})
.map_err(|e| BatchedPageStreamError {
@@ -2324,7 +2298,7 @@ impl PageServerHandler {
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage,), BatchedPageStreamError>> {
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for _req in requests.iter() {
@@ -2350,7 +2324,6 @@ impl PageServerHandler {
PagestreamBeMessage::Test(models::PagestreamTestResponse {
req: req.req.clone(),
}),
req.timer,
)
})
.map_err(|e| BatchedPageStreamError {

View File

@@ -3850,33 +3850,13 @@ impl Tenant {
}
pub(crate) fn get_sizes(&self) -> TopTenantShardItem {
let mut result = TopTenantShardItem {
TopTenantShardItem {
id: self.tenant_shard_id,
resident_size: 0,
physical_size: 0,
max_logical_size: 0,
max_logical_size_per_shard: 0,
};
for timeline in self.timelines.lock().unwrap().values() {
result.resident_size += timeline.metrics.resident_physical_size_gauge.get();
result.physical_size += timeline
.remote_client
.metrics
.remote_physical_size_gauge
.get();
result.max_logical_size = std::cmp::max(
result.max_logical_size,
timeline.metrics.current_logical_size_gauge.get(),
);
}
result.max_logical_size_per_shard = result
.max_logical_size
.div_ceil(self.tenant_shard_id.shard_count.count() as u64);
result
}
}
@@ -4666,10 +4646,6 @@ impl Tenant {
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
timeline
.metrics
.valid_lsn_lease_count_gauge
.set(target.leases.len() as u64);
// Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {

View File

@@ -105,8 +105,7 @@ use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate,
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_AMORTIZED_GLOBAL, LAYERS_PER_READ_BATCH_GLOBAL,
LAYERS_PER_READ_GLOBAL, TimelineMetrics,
DELTAS_PER_READ_GLOBAL, TimelineMetrics,
};
use crate::page_service::TenantManagerTypes;
use crate::pgdatadir_mapping::{
@@ -1423,42 +1422,21 @@ impl Timeline {
// when they're missing. Instead they are omitted from the resulting btree
// (this is a requirement, not a bug). Skip updating the metric in these cases
// to avoid infinite results.
if !results.is_empty() {
if layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
let total_keyspace = query.total_keyspace();
let max_request_lsn = query.high_watermark_lsn().expect("Validated previously");
if !results.is_empty() && layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
let total_keyspace = query.total_keyspace();
let max_request_lsn = query.high_watermark_lsn().expect("Validated previously");
static LOG_PACER: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
LOG_PACER.lock().unwrap().call(|| {
let num_keys = total_keyspace.total_raw_size();
let num_pages = results.len();
tracing::info!(
shard_id = %self.tenant_shard_id.shard_slug(),
lsn = %max_request_lsn,
"Vectored read for {total_keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
);
});
}
// Records the number of layers visited in a few different ways:
//
// * LAYERS_PER_READ: all layers count towards every read in the batch, because each
// layer directly affects its observed latency.
//
// * LAYERS_PER_READ_BATCH: all layers count towards each batch, to get the per-batch
// layer visits and access cost.
//
// * LAYERS_PER_READ_AMORTIZED: the average layer count per read, to get the amortized
// read amplification after batching.
let layers_visited = layers_visited as f64;
let avg_layers_visited = layers_visited / results.len() as f64;
LAYERS_PER_READ_BATCH_GLOBAL.observe(layers_visited);
for _ in &results {
self.metrics.layers_per_read.observe(layers_visited);
LAYERS_PER_READ_GLOBAL.observe(layers_visited);
LAYERS_PER_READ_AMORTIZED_GLOBAL.observe(avg_layers_visited);
}
static LOG_PACER: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
LOG_PACER.lock().unwrap().call(|| {
let num_keys = total_keyspace.total_raw_size();
let num_pages = results.len();
tracing::info!(
shard_id = %self.tenant_shard_id.shard_slug(),
lsn = %max_request_lsn,
"Vectored read for {total_keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
);
});
}
Ok(results)
@@ -1576,8 +1554,6 @@ impl Timeline {
};
let timer = crate::metrics::WAIT_LSN_TIME.start_timer();
let start_finish_counterpair_guard = self.metrics.wait_lsn_start_finish_counterpair.guard();
let wait_for_timeout = self.last_record_lsn.wait_for_timeout(lsn, timeout);
let wait_for_timeout = std::pin::pin!(wait_for_timeout);
// Use threshold of 1 because even 1 second of wait for ingest is very much abnormal.
@@ -1593,11 +1569,8 @@ impl Timeline {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback,
elapsed_since_last_callback: _,
}| {
self.metrics
.wait_lsn_in_progress_micros
.inc_by(u64::try_from(elapsed_since_last_callback.as_micros()).unwrap());
if !is_slow {
return;
}
@@ -1627,7 +1600,6 @@ impl Timeline {
let res = wait_for_timeout.await;
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(logging_permit);
drop(start_finish_counterpair_guard);
drop(timer);
match res {
Ok(()) => Ok(()),
@@ -3131,8 +3103,6 @@ impl Timeline {
let mut guard = self.layers.write().await;
let timer = self.metrics.load_layer_map_histo.start_timer();
// Scan timeline directory and create ImageLayerName and DeltaFilename
// structs representing all files on disk
let timeline_path = self
@@ -3293,7 +3263,6 @@ impl Timeline {
num_layers, disk_consistent_lsn, total_physical_size
);
timer.stop_and_record();
Ok(())
}
@@ -4590,7 +4559,7 @@ impl Timeline {
}
// Flush the layer.
let flush_timer = self.metrics.flush_time_histo.start_timer();
let flush_timer = Instant::now();
match self.flush_frozen_layer(layer, ctx).await {
Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn),
Err(FlushLayerError::Cancelled) => {
@@ -4606,7 +4575,7 @@ impl Timeline {
break err.map(|_| ());
}
}
let flush_duration = flush_timer.stop_and_record();
let flush_duration = flush_timer.elapsed();
// Notify the tenant compaction loop if L0 compaction is needed.
let l0_count = *watch_l0.borrow();
@@ -4870,9 +4839,6 @@ impl Timeline {
"disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}"
);
self.metrics
.disk_consistent_lsn_gauge
.set(new_value.0 as i64);
new_value != old_value
}

View File

@@ -290,7 +290,7 @@ impl OpenLayerManager {
lsn: Lsn,
last_freeze_at: &AtomicLsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
metrics: &TimelineMetrics,
_metrics: &TimelineMetrics,
) -> bool {
let Lsn(last_record_lsn) = lsn;
let end_lsn = Lsn(last_record_lsn + 1);
@@ -299,10 +299,6 @@ impl OpenLayerManager {
let open_layer_rc = Arc::clone(open_layer);
open_layer.freeze(end_lsn).await;
// Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
// TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
// reference to the timeline metrics. Other methods use a metrics borrow as well.
metrics.inc_frozen_layer(open_layer);
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
@@ -334,16 +330,12 @@ impl OpenLayerManager {
pub(crate) fn track_new_image_layers(
&mut self,
image_layers: &[ResidentLayer],
metrics: &TimelineMetrics,
_metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
for layer in image_layers {
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
// record these here instead of Layer::finish_creating because otherwise partial
// failure with create_image_layers would balloon up the physical size gauge. downside
// is that all layers need to be created before metrics are updated.
metrics.record_new_file_metrics(layer.layer_desc().file_size);
}
updates.flush();
}
@@ -353,14 +345,13 @@ impl OpenLayerManager {
&mut self,
delta_layer: Option<&ResidentLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
metrics: &TimelineMetrics,
_metrics: &TimelineMetrics,
) {
let inmem = self
.layer_map
.frozen_layers
.pop_front()
.expect("there must be a inmem layer to flush");
metrics.dec_frozen_layer(&inmem);
// Only one task may call this function at a time (for this
// timeline). If two tasks tried to flush the same frozen
@@ -370,7 +361,6 @@ impl OpenLayerManager {
if let Some(l) = delta_layer {
let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
updates.flush();
}
}
@@ -380,12 +370,11 @@ impl OpenLayerManager {
&mut self,
compact_from: &[Layer],
compact_to: &[ResidentLayer],
metrics: &TimelineMetrics,
_metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
for l in compact_from {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
@@ -438,7 +427,7 @@ impl OpenLayerManager {
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
add_layers: &[ResidentLayer],
metrics: &TimelineMetrics,
_metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
for (old_layer, new_layer) in rewrite_layers {
@@ -469,14 +458,12 @@ impl OpenLayerManager {
&mut self.layer_fmgr,
);
metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
}
for l in drop_layers {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
for l in add_layers {
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
updates.flush();
}

View File

@@ -572,7 +572,6 @@ pub(super) async fn handle_walreceiver_connection(
}
// Ingest the records without immediately committing them.
timeline.metrics.wal_records_received.inc();
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await

View File

@@ -54,14 +54,7 @@ static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
pub(crate) fn set(engine_kind: IoEngineKind) {
let engine: IoEngine = engine_kind.into();
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
#[cfg(not(test))]
{
let metric = &crate::metrics::virtual_file_io_engine::KIND;
metric.reset();
metric
.with_label_values(&[&format!("{engine_kind}")])
.set(1);
}
}
#[cfg(not(test))]