Compare commits

...

4 Commits

Author SHA1 Message Date
Vlad Lazar
82e3866c27 pageserver: rate limited warning on too many layers visited 2024-04-22 17:32:12 +01:00
Vlad Lazar
7a8effc190 tests: add a compaction smoke test 2024-04-22 17:32:12 +01:00
Vlad Lazar
35e299d308 pageserver: add a metric counting layer visits on vectored read path 2024-04-22 17:32:10 +01:00
Vlad Lazar
ea5b36c47a pageserver: tweak metric counting layer visits on non-vectored read path
* Give it a better name
* Track all layers
* Larger buckets
2024-04-22 17:32:05 +01:00
6 changed files with 178 additions and 9 deletions

View File

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
pub struct RateLimit {
last: Option<Instant>,
interval: Duration,
rate_limited: u32,
}
impl RateLimit {
@@ -12,9 +13,16 @@ impl RateLimit {
Self {
last: None,
interval,
rate_limited: 0,
}
}
/// Returns the number of calls that were rate limited
/// since the last one was allowed.
pub fn rate_limited(&self) -> u32 {
self.rate_limited
}
/// Call `f` if the rate limit allows.
/// Don't call it otherwise.
pub fn call<F: FnOnce()>(&mut self, f: F) {
@@ -22,9 +30,13 @@ impl RateLimit {
match self.last {
Some(last) if now - last <= self.interval => {
// ratelimit
if let Some(updated) = self.rate_limited.checked_add(1) {
self.rate_limited = updated;
}
}
_ => {
self.last = Some(now);
self.rate_limited = 0;
f();
}
}
@@ -50,17 +62,24 @@ mod tests {
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 0);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 2);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
assert_eq!(f.rate_limited(), 0);
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
assert_eq!(f.rate_limited(), 1);
f.call(cl);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 3);
assert_eq!(f.rate_limited(), 0);
}
}

View File

@@ -86,11 +86,20 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_read_num_fs_layers",
"Number of persistent layers accessed for processing a read request, including those in the cache",
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
"pageserver_layers_visited_per_read_global",
"Number of layers visited to reconstruct one key",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric")
});
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_visited_per_vectored_read_global",
"Average number of layers visited to reconstruct one key",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric")
});
@@ -2771,7 +2780,8 @@ pub fn preinitialize_metrics() {
// histograms
[
&READ_NUM_FS_LAYERS,
&READ_NUM_LAYERS_VISITED,
&VEC_READ_NUM_LAYERS_VISITED,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,

View File

@@ -118,6 +118,7 @@ pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
keys_done: KeySpaceRandomAccum,
layers_visited: u32,
}
impl ValuesReconstructState {
@@ -125,6 +126,7 @@ impl ValuesReconstructState {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
layers_visited: 0,
}
}
@@ -138,6 +140,14 @@ impl ValuesReconstructState {
}
}
pub(crate) fn on_layer_visited(&mut self) {
self.layers_visited += 1;
}
pub(crate) fn get_layers_visited(&self) -> u32 {
self.layers_visited
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///

View File

@@ -824,6 +824,10 @@ impl Timeline {
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
// If we are visiting more than 50 layers to reconstruct one value,
// it's likely that something is wrong with compaction. We print a
// rate limited warning in that case.
pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u64 = 50;
/// Look up multiple page versions at a given LSN
///
@@ -957,6 +961,7 @@ impl Timeline {
.await?;
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
@@ -971,6 +976,25 @@ impl Timeline {
}
}
// Note that this is an approximation. Tracking the exact number of layers visited
// per key requires virtually unbounded memory usage and is inefficient
// (i.e. segment tree tracking each range queried from a layer)
let average_layers_visited = layers_visited as f64 / results.len() as f64;
crate::metrics::VEC_READ_NUM_LAYERS_VISITED.observe(average_layers_visited);
if average_layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD as f64 {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
let rate_limited = rate_limit.rate_limited();
rate_limit.call(|| {
warn!("Too many layers visited by vectored read: {} >= {}; rate limited {} occurences",
average_layers_visited, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
});
}
Ok(results)
}
@@ -2794,7 +2818,18 @@ impl Timeline {
let mut timeline = self;
let mut read_count = scopeguard::guard(0, |cnt| {
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64);
if cnt >= Self::LAYERS_VISITED_WARN_THRESHOLD {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
let rate_limited = rate_limit.rate_limited();
rate_limit.call(|| {
warn!("Too many layers visited by non-vectored read: {} >= {}; rate limited {} occurences",
cnt, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
});
}
});
// For debugging purposes, collect the path of layers that we traversed
@@ -2909,7 +2944,7 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
*read_count += 1;
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
continue 'outer;
}
@@ -2936,7 +2971,7 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
*read_count += 1;
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
continue 'outer;
}
@@ -3129,6 +3164,8 @@ impl Timeline {
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited();
} else {
break;
}

View File

@@ -129,7 +129,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_read_num_fs_layers"),
*histogram("pageserver_layers_visited_per_read_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),

View File

@@ -0,0 +1,93 @@
import os
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.workload import Workload
AGGRESIVE_COMPACTION_TENANT_CONF = {
# Disable gc and compaction. The test runs compaction manually.
"gc_period": "0s",
"compaction_period": "0s",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024**2,
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 2,
# INC-186: remove when merging the fix
"image_layer_creation_check_threshold": 0,
}
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
"""
This is a smoke test that compaction kicks in. The workload repeatedly churns
a small number of rows and manually instructs the pageserver to run compaction
between iterations. At the end of the test validate that the average number of
layers visited to gather reconstruct data for a given key is within the empirically
observed bounds.
"""
# Effectively disable the page cache to rely only on image layers
# to shorten reads.
neon_env_builder.pageserver_config_override = """
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 10000
churn_rounds = 100
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
ps_http.timeline_compact(tenant_id, timeline_id)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
log.info("Checking layer access metrics ...")
layer_access_metric_names = [
"pageserver_layers_visited_per_read_global_sum",
"pageserver_layers_visited_per_read_global_count",
"pageserver_layers_visited_per_read_global_bucket",
"pageserver_layers_visited_per_vectored_read_global_sum",
"pageserver_layers_visited_per_vectored_read_global_count",
"pageserver_layers_visited_per_vectored_read_global_bucket",
]
metrics = env.pageserver.http_client().get_metrics()
for name in layer_access_metric_names:
layer_access_metrics = metrics.query_all(name)
log.info(f"Got metrics: {layer_access_metrics}")
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
non_vectored_average = non_vectored_sum / non_vectored_count
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
vectored_average = vectored_sum / vectored_count
log.info(f"{non_vectored_average=} {vectored_average=}")
# The upper bound for average number of layer visits below (8)
# was chosen empirically for this workload.
assert non_vectored_average < 8
assert vectored_average < 8