mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
feat(metrics): per-timeline metric for on-demand downloads, remove calls_started histogram (#6834)
refs #6737 # Problem Before this PR, on-demand downloads weren't measured per tenant_id. This makes root-cause analysis of latency spikes harder, requiring us to resort to log scraping for ``` {neon_service="pageserver"} |= `downloading on-demand` |= `$tenant_id` ``` which can be expensive when zooming out in Grafana. Context: https://neondb.slack.com/archives/C033RQ5SPDH/p1707809037868189 # Solution / Changes - Remove the calls_started histogram - I did the dilegence, there are only 2 dashboards using this histogram, and in fact only one uses it as a histogram, the other just as a a counter. - [Link 1](8115b54d9f/neonprod/dashboards/hkXNF7oVz/dashboard-Z31XmM24k.yaml (L1454)): `Pageserver Thrashing` dashboard, linked from playbook, will fix. - [Link 2](8115b54d9f/neonprod/dashboards/CEllzAO4z/dashboard-sJqfNFL4k.yaml (L599)): one of my personal dashboards, unused for a long time, already broken in other ways, no need to fix. - replace `pageserver_remote_timeline_client_calls_unfinished` gauge with a counter pair - Required `Clone`-able `IntCounterPair`, made the necessary changes in the `libs/metrics` crate - fix tests to deal with the fallout A subsequent PR will remove a timeline-scoped metric to compensate. Note that we don't need additional global counters for the per-timeline counters affected by this PR; we can use the `remote_storage` histogram for those, which, conveniently, also include the secondary-mode downloads, which aren't covered by the remote timeline client metrics (should they?).
This commit is contained in:
committed by
GitHub
parent
eb02f4619e
commit
e49602ecf5
@@ -201,6 +201,11 @@ impl<P: Atomic> GenericCounterPairVec<P> {
|
||||
pub fn with_label_values(&self, vals: &[&str]) -> GenericCounterPair<P> {
|
||||
self.get_metric_with_label_values(vals).unwrap()
|
||||
}
|
||||
|
||||
pub fn remove_label_values(&self, res: &mut [Result<()>; 2], vals: &[&str]) {
|
||||
res[0] = self.inc.remove_label_values(vals);
|
||||
res[1] = self.dec.remove_label_values(vals);
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: Atomic> GenericCounterPair<P> {
|
||||
@@ -247,6 +252,15 @@ impl<P: Atomic> GenericCounterPair<P> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: Atomic> Clone for GenericCounterPair<P> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inc: self.inc.clone(),
|
||||
dec: self.dec.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard returned by [`GenericCounterPair::guard`]
|
||||
pub struct GenericCounterPairGuard<P: Atomic>(GenericCounter<P>);
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ use metrics::{
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
|
||||
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPairVec,
|
||||
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -1266,13 +1266,12 @@ pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
|
||||
// remote storage metrics
|
||||
|
||||
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
|
||||
static REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
"Number of ongoing calls to remote timeline client. \
|
||||
Used to populate pageserver_remote_timeline_client_calls_started. \
|
||||
This metric is not useful for sampling from Prometheus, but useful in tests.",
|
||||
static REMOTE_TIMELINE_CLIENT_CALLS: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"pageserver_remote_timeline_client_calls_started",
|
||||
"Number of started calls to remote timeline client.",
|
||||
"pageserver_remote_timeline_client_calls_finished",
|
||||
"Number of finshed calls to remote timeline client.",
|
||||
&[
|
||||
"tenant_id",
|
||||
"shard_id",
|
||||
@@ -1281,23 +1280,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE: Lazy<IntGaugeVec> = Lazy::
|
||||
"op_kind"
|
||||
],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_timeline_client_calls_started",
|
||||
"When calling a remote timeline client method, we record the current value \
|
||||
of the calls_unfinished gauge in this histogram. Plot the histogram \
|
||||
over time in a heatmap to visualize how many operations were ongoing \
|
||||
at a given instant. It gives you a better idea of the queue depth \
|
||||
than plotting the gauge directly, since operations may complete faster \
|
||||
than the sampling interval.",
|
||||
&["file_kind", "op_kind"],
|
||||
// The calls_unfinished gauge is an integer gauge, hence we have integer buckets.
|
||||
vec![0.0, 1.0, 2.0, 4.0, 6.0, 8.0, 10.0, 15.0, 20.0, 40.0, 60.0, 80.0, 100.0, 500.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> =
|
||||
@@ -2078,7 +2061,7 @@ pub(crate) struct RemoteTimelineClientMetrics {
|
||||
shard_id: String,
|
||||
timeline_id: String,
|
||||
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
@@ -2089,7 +2072,7 @@ impl RemoteTimelineClientMetrics {
|
||||
tenant_id: tenant_shard_id.tenant_id.to_string(),
|
||||
shard_id: format!("{}", tenant_shard_id.shard_slug()),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
@@ -2129,15 +2112,15 @@ impl RemoteTimelineClientMetrics {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
fn calls_counter_pair(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntGauge {
|
||||
let mut guard = self.calls_unfinished_gauge.lock().unwrap();
|
||||
) -> IntCounterPair {
|
||||
let mut guard = self.calls.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE
|
||||
REMOTE_TIMELINE_CLIENT_CALLS
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id,
|
||||
&self.shard_id,
|
||||
@@ -2150,17 +2133,6 @@ impl RemoteTimelineClientMetrics {
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_started_hist(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Histogram {
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
|
||||
.get_metric_with_label_values(&[key.0, key.1])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
@@ -2231,7 +2203,7 @@ impl RemoteTimelineClientMetrics {
|
||||
#[must_use]
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard {
|
||||
/// Decremented on drop.
|
||||
calls_unfinished_metric: Option<IntGauge>,
|
||||
calls_counter_pair: Option<IntCounterPair>,
|
||||
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
|
||||
bytes_finished: Option<(IntCounter, u64)>,
|
||||
}
|
||||
@@ -2241,10 +2213,10 @@ impl RemoteTimelineClientCallMetricGuard {
|
||||
/// The caller vouches to do the metric updates manually.
|
||||
pub fn will_decrement_manually(mut self) {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
calls_counter_pair,
|
||||
bytes_finished,
|
||||
} = &mut self;
|
||||
calls_unfinished_metric.take();
|
||||
calls_counter_pair.take();
|
||||
bytes_finished.take();
|
||||
}
|
||||
}
|
||||
@@ -2252,10 +2224,10 @@ impl RemoteTimelineClientCallMetricGuard {
|
||||
impl Drop for RemoteTimelineClientCallMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
calls_counter_pair,
|
||||
bytes_finished,
|
||||
} = self;
|
||||
if let Some(guard) = calls_unfinished_metric.take() {
|
||||
if let Some(guard) = calls_counter_pair.take() {
|
||||
guard.dec();
|
||||
}
|
||||
if let Some((bytes_finished_metric, value)) = bytes_finished {
|
||||
@@ -2288,10 +2260,8 @@ impl RemoteTimelineClientMetrics {
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) -> RemoteTimelineClientCallMetricGuard {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
self.calls_started_hist(file_kind, op_kind)
|
||||
.observe(calls_unfinished_metric.get() as f64);
|
||||
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
|
||||
let calls_counter_pair = self.calls_counter_pair(file_kind, op_kind);
|
||||
calls_counter_pair.inc();
|
||||
|
||||
let bytes_finished = match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
|
||||
@@ -2305,7 +2275,7 @@ impl RemoteTimelineClientMetrics {
|
||||
}
|
||||
};
|
||||
RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric: Some(calls_unfinished_metric),
|
||||
calls_counter_pair: Some(calls_counter_pair),
|
||||
bytes_finished,
|
||||
}
|
||||
}
|
||||
@@ -2319,12 +2289,8 @@ impl RemoteTimelineClientMetrics {
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
debug_assert!(
|
||||
calls_unfinished_metric.get() > 0,
|
||||
"begin and end should cancel out"
|
||||
);
|
||||
calls_unfinished_metric.dec();
|
||||
let calls_counter_pair = self.calls_counter_pair(file_kind, op_kind);
|
||||
calls_counter_pair.dec();
|
||||
match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
@@ -2341,18 +2307,15 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
remote_physical_size_gauge,
|
||||
calls_unfinished_gauge,
|
||||
calls,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b), _) in calls_unfinished_gauge.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
for ((a, b), _) in calls.get_mut().unwrap().drain() {
|
||||
let mut res = [Ok(()), Ok(())];
|
||||
REMOTE_TIMELINE_CLIENT_CALLS
|
||||
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id, a, b]);
|
||||
// don't care about results
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
|
||||
@@ -614,7 +614,7 @@ impl RemoteTimelineClient {
|
||||
metadata,
|
||||
);
|
||||
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
|
||||
|
||||
@@ -654,7 +654,7 @@ impl RemoteTimelineClient {
|
||||
metadata.generation, metadata.shard
|
||||
);
|
||||
let op = UploadOp::UploadLayer(layer, metadata);
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
@@ -830,7 +830,7 @@ impl RemoteTimelineClient {
|
||||
let op = UploadOp::Delete(Delete {
|
||||
layers: with_metadata,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
@@ -1520,10 +1520,10 @@ impl RemoteTimelineClient {
|
||||
.await;
|
||||
}
|
||||
|
||||
self.calls_unfinished_metric_end(&task.op);
|
||||
self.metric_end(&task.op);
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_impl(
|
||||
fn metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(
|
||||
@@ -1560,17 +1560,17 @@ impl RemoteTimelineClient {
|
||||
Some(res)
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
fn metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
|
||||
guard.will_decrement_manually(); // in metric_end(), see right below
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
fn metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
@@ -1655,7 +1655,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Tear down queued ops
|
||||
for op in qi.queued_operations.into_iter() {
|
||||
self.calls_unfinished_metric_end(&op);
|
||||
self.metric_end(&op);
|
||||
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
|
||||
// which is exactly what we want to happen.
|
||||
drop(op);
|
||||
|
||||
@@ -54,7 +54,7 @@ class MetricsGetter:
|
||||
return results[0].value
|
||||
|
||||
def get_metrics_values(
|
||||
self, names: list[str], filter: Optional[Dict[str, str]] = None
|
||||
self, names: list[str], filter: Optional[Dict[str, str]] = None, absence_ok=False
|
||||
) -> Dict[str, float]:
|
||||
"""
|
||||
When fetching multiple named metrics, it is more efficient to use this
|
||||
@@ -63,6 +63,10 @@ class MetricsGetter:
|
||||
Throws RuntimeError if no metrics matching `names` are found, or if
|
||||
not all of `names` are found: this method is intended for loading sets
|
||||
of metrics whose existence is coupled.
|
||||
|
||||
If it's expected that there may be no results for some of the metrics,
|
||||
specify `absence_ok=True`. The returned dict will then not contain values
|
||||
for these metrics.
|
||||
"""
|
||||
metrics = self.get_metrics()
|
||||
samples = []
|
||||
@@ -75,9 +79,10 @@ class MetricsGetter:
|
||||
raise RuntimeError(f"Multiple values found for {sample.name}")
|
||||
result[sample.name] = sample.value
|
||||
|
||||
if len(result) != len(names):
|
||||
log.info(f"Metrics found: {metrics.metrics}")
|
||||
raise RuntimeError(f"could not find all metrics {' '.join(names)}")
|
||||
if not absence_ok:
|
||||
if len(result) != len(names):
|
||||
log.info(f"Metrics found: {metrics.metrics}")
|
||||
raise RuntimeError(f"could not find all metrics {' '.join(names)}")
|
||||
|
||||
return result
|
||||
|
||||
@@ -98,7 +103,8 @@ def histogram(prefix_without_trailing_underscore: str) -> List[str]:
|
||||
|
||||
|
||||
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
"pageserver_remote_timeline_client_calls_started_total",
|
||||
"pageserver_remote_timeline_client_calls_finished_total",
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
@@ -127,7 +133,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
*histogram("pageserver_remote_timeline_client_calls_started"),
|
||||
*histogram("pageserver_io_operations_seconds"),
|
||||
"pageserver_tenant_states_count",
|
||||
)
|
||||
|
||||
@@ -694,32 +694,33 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
},
|
||||
).value
|
||||
|
||||
def get_remote_timeline_client_metric(
|
||||
def get_remote_timeline_client_queue_count(
|
||||
self,
|
||||
metric_name: str,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
file_kind: str,
|
||||
op_kind: str,
|
||||
) -> Optional[float]:
|
||||
metrics = self.get_metrics()
|
||||
matches = metrics.query_all(
|
||||
name=metric_name,
|
||||
) -> Optional[int]:
|
||||
metrics = [
|
||||
"pageserver_remote_timeline_client_calls_started_total",
|
||||
"pageserver_remote_timeline_client_calls_finished_total",
|
||||
]
|
||||
res = self.get_metrics_values(
|
||||
metrics,
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
absence_ok=True,
|
||||
)
|
||||
if len(matches) == 0:
|
||||
value = None
|
||||
elif len(matches) == 1:
|
||||
value = matches[0].value
|
||||
assert value is not None
|
||||
else:
|
||||
assert len(matches) < 2, "above filter should uniquely identify metric"
|
||||
return value
|
||||
if len(res) != 2:
|
||||
return None
|
||||
inc, dec = [res[metric] for metric in metrics]
|
||||
queue_count = int(inc) - int(dec)
|
||||
assert queue_count >= 0
|
||||
return queue_count
|
||||
|
||||
def layer_map_info(
|
||||
self,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
DeleteObjectOutputTypeDef,
|
||||
@@ -221,16 +221,40 @@ def wait_for_upload_queue_empty(
|
||||
):
|
||||
while True:
|
||||
all_metrics = pageserver_http.get_metrics()
|
||||
tl = all_metrics.query_all(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
started = all_metrics.query_all(
|
||||
"pageserver_remote_timeline_client_calls_started_total",
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
assert len(tl) > 0
|
||||
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
|
||||
if all(m.value == 0 for m in tl):
|
||||
finished = all_metrics.query_all(
|
||||
"pageserver_remote_timeline_client_calls_finished_total",
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
assert len(started) == len(finished)
|
||||
# this is `started left join finished`; if match, subtracting start from finished, resulting in queue depth
|
||||
remaining_labels = ["shard_id", "file_kind", "op_kind"]
|
||||
tl: List[Tuple[Any, float]] = []
|
||||
for s in started:
|
||||
found = False
|
||||
for f in finished:
|
||||
if all([s.labels[label] == f.labels[label] for label in remaining_labels]):
|
||||
assert (
|
||||
not found
|
||||
), "duplicate match, remaining_labels don't uniquely identify sample"
|
||||
tl.append((s.labels, int(s.value) - int(f.value)))
|
||||
found = True
|
||||
if not found:
|
||||
tl.append((s.labels, int(s.value)))
|
||||
assert len(tl) == len(started), "something broken with join logic"
|
||||
log.info(f"upload queue for {tenant_id}/{timeline_id}:")
|
||||
for labels, queue_count in tl:
|
||||
log.info(f" {labels}: {queue_count}")
|
||||
if all(queue_count == 0 for (_, queue_count) in tl):
|
||||
return
|
||||
time.sleep(0.2)
|
||||
|
||||
|
||||
@@ -274,15 +274,9 @@ def test_remote_storage_upload_queue_retries(
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
file_kind,
|
||||
op_kind,
|
||||
return client.get_remote_timeline_client_queue_count(
|
||||
tenant_id, timeline_id, file_kind, op_kind
|
||||
)
|
||||
assert val is not None, "expecting metric to be present"
|
||||
return int(val)
|
||||
|
||||
# create some layers & wait for uploads to finish
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
|
||||
@@ -434,7 +428,7 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
assert timeline_id is not None
|
||||
for (file_kind, op_kind), observations in calls_started.items():
|
||||
val = client.get_metric_value(
|
||||
name="pageserver_remote_timeline_client_calls_started_count",
|
||||
name="pageserver_remote_timeline_client_calls_started_total",
|
||||
filter={
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
@@ -537,16 +531,6 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
file_kind,
|
||||
op_kind,
|
||||
)
|
||||
return int(val) if val is not None else val
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
client.configure_failpoints(("before-upload-layer", "return"))
|
||||
@@ -580,7 +564,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
def assert_compacted_and_uploads_queued():
|
||||
assert timeline_path.exists()
|
||||
assert len(list(timeline_path.glob("*"))) >= 8
|
||||
assert get_queued_count(file_kind="index", op_kind="upload") > 0
|
||||
assert (
|
||||
get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload")
|
||||
> 0
|
||||
)
|
||||
|
||||
wait_until(20, 0.1, assert_compacted_and_uploads_queued)
|
||||
|
||||
@@ -618,7 +605,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
assert len(filtered) == 0
|
||||
|
||||
# timeline deletion should kill ongoing uploads, so, the metric will be gone
|
||||
assert get_queued_count(file_kind="index", op_kind="upload") is None
|
||||
assert (
|
||||
get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload")
|
||||
is None
|
||||
)
|
||||
|
||||
# timeline deletion should be unblocking checkpoint ops
|
||||
checkpoint_thread.join(2.0)
|
||||
@@ -919,16 +909,8 @@ def get_queued_count(
|
||||
file_kind: str,
|
||||
op_kind: str,
|
||||
):
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
file_kind,
|
||||
op_kind,
|
||||
)
|
||||
if val is None:
|
||||
return val
|
||||
return int(val)
|
||||
"""The most important aspect of this function is shorter name & no return type so asserts are more concise."""
|
||||
return client.get_remote_timeline_client_queue_count(tenant_id, timeline_id, file_kind, op_kind)
|
||||
|
||||
|
||||
def assert_nothing_to_upload(
|
||||
|
||||
Reference in New Issue
Block a user