mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
4 Commits
quantumish
...
vlad/read-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82e3866c27 | ||
|
|
7a8effc190 | ||
|
|
35e299d308 | ||
|
|
ea5b36c47a |
@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
|
||||
pub struct RateLimit {
|
||||
last: Option<Instant>,
|
||||
interval: Duration,
|
||||
rate_limited: u32,
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
@@ -12,9 +13,16 @@ impl RateLimit {
|
||||
Self {
|
||||
last: None,
|
||||
interval,
|
||||
rate_limited: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of calls that were rate limited
|
||||
/// since the last one was allowed.
|
||||
pub fn rate_limited(&self) -> u32 {
|
||||
self.rate_limited
|
||||
}
|
||||
|
||||
/// Call `f` if the rate limit allows.
|
||||
/// Don't call it otherwise.
|
||||
pub fn call<F: FnOnce()>(&mut self, f: F) {
|
||||
@@ -22,9 +30,13 @@ impl RateLimit {
|
||||
match self.last {
|
||||
Some(last) if now - last <= self.interval => {
|
||||
// ratelimit
|
||||
if let Some(updated) = self.rate_limited.checked_add(1) {
|
||||
self.rate_limited = updated;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
self.last = Some(now);
|
||||
self.rate_limited = 0;
|
||||
f();
|
||||
}
|
||||
}
|
||||
@@ -50,17 +62,24 @@ mod tests {
|
||||
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
assert_eq!(f.rate_limited(), 0);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
assert_eq!(f.rate_limited(), 1);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 1);
|
||||
assert_eq!(f.rate_limited(), 2);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 2);
|
||||
assert_eq!(f.rate_limited(), 0);
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 2);
|
||||
assert_eq!(f.rate_limited(), 1);
|
||||
f.call(cl);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
f.call(cl);
|
||||
assert_eq!(called.load(Relaxed), 3);
|
||||
assert_eq!(f.rate_limited(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,11 +86,20 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
|
||||
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_read_num_fs_layers",
|
||||
"Number of persistent layers accessed for processing a read request, including those in the cache",
|
||||
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
|
||||
"pageserver_layers_visited_per_read_global",
|
||||
"Number of layers visited to reconstruct one key",
|
||||
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_visited_per_vectored_read_global",
|
||||
"Average number of layers visited to reconstruct one key",
|
||||
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -2771,7 +2780,8 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// histograms
|
||||
[
|
||||
&READ_NUM_FS_LAYERS,
|
||||
&READ_NUM_LAYERS_VISITED,
|
||||
&VEC_READ_NUM_LAYERS_VISITED,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
&WAL_REDO_RECORDS_HISTOGRAM,
|
||||
|
||||
@@ -118,6 +118,7 @@ pub(crate) struct ValuesReconstructState {
|
||||
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
|
||||
|
||||
keys_done: KeySpaceRandomAccum,
|
||||
layers_visited: u32,
|
||||
}
|
||||
|
||||
impl ValuesReconstructState {
|
||||
@@ -125,6 +126,7 @@ impl ValuesReconstructState {
|
||||
Self {
|
||||
keys: HashMap::new(),
|
||||
keys_done: KeySpaceRandomAccum::new(),
|
||||
layers_visited: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +140,14 @@ impl ValuesReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_layer_visited(&mut self) {
|
||||
self.layers_visited += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn get_layers_visited(&self) -> u32 {
|
||||
self.layers_visited
|
||||
}
|
||||
|
||||
/// Update the state collected for a given key.
|
||||
/// Returns true if this was the last value needed for the key and false otherwise.
|
||||
///
|
||||
|
||||
@@ -824,6 +824,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
|
||||
// If we are visiting more than 50 layers to reconstruct one value,
|
||||
// it's likely that something is wrong with compaction. We print a
|
||||
// rate limited warning in that case.
|
||||
pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u64 = 50;
|
||||
|
||||
/// Look up multiple page versions at a given LSN
|
||||
///
|
||||
@@ -957,6 +961,7 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
for (key, res) in reconstruct_state.keys {
|
||||
match res {
|
||||
Err(err) => {
|
||||
@@ -971,6 +976,25 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Note that this is an approximation. Tracking the exact number of layers visited
|
||||
// per key requires virtually unbounded memory usage and is inefficient
|
||||
// (i.e. segment tree tracking each range queried from a layer)
|
||||
let average_layers_visited = layers_visited as f64 / results.len() as f64;
|
||||
|
||||
crate::metrics::VEC_READ_NUM_LAYERS_VISITED.observe(average_layers_visited);
|
||||
|
||||
if average_layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD as f64 {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
let rate_limited = rate_limit.rate_limited();
|
||||
rate_limit.call(|| {
|
||||
warn!("Too many layers visited by vectored read: {} >= {}; rate limited {} occurences",
|
||||
average_layers_visited, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
@@ -2794,7 +2818,18 @@ impl Timeline {
|
||||
let mut timeline = self;
|
||||
|
||||
let mut read_count = scopeguard::guard(0, |cnt| {
|
||||
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
|
||||
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64);
|
||||
if cnt >= Self::LAYERS_VISITED_WARN_THRESHOLD {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
let rate_limited = rate_limit.rate_limited();
|
||||
rate_limit.call(|| {
|
||||
warn!("Too many layers visited by non-vectored read: {} >= {}; rate limited {} occurences",
|
||||
cnt, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
@@ -2909,7 +2944,7 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
|
||||
*read_count += 1;
|
||||
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -2936,7 +2971,7 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
|
||||
*read_count += 1;
|
||||
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -3129,6 +3164,8 @@ impl Timeline {
|
||||
|
||||
unmapped_keyspace = keyspace_to_read;
|
||||
cont_lsn = next_cont_lsn;
|
||||
|
||||
reconstruct_state.on_layer_visited();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*histogram("pageserver_smgr_query_seconds_global"),
|
||||
*histogram("pageserver_read_num_fs_layers"),
|
||||
*histogram("pageserver_layers_visited_per_read_global"),
|
||||
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
|
||||
93
test_runner/regress/test_compaction.py
Normal file
93
test_runner/regress/test_compaction.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.workload import Workload
|
||||
|
||||
AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
# Disable gc and compaction. The test runs compaction manually.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 2,
|
||||
# INC-186: remove when merging the fix
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
|
||||
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
a small number of rows and manually instructs the pageserver to run compaction
|
||||
between iterations. At the end of the test validate that the average number of
|
||||
layers visited to gather reconstruct data for a given key is within the empirically
|
||||
observed bounds.
|
||||
"""
|
||||
|
||||
# Effectively disable the page cache to rely only on image layers
|
||||
# to shorten reads.
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 10000
|
||||
churn_rounds = 100
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
if i % 10 == 0:
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
|
||||
log.info("Checking layer access metrics ...")
|
||||
|
||||
layer_access_metric_names = [
|
||||
"pageserver_layers_visited_per_read_global_sum",
|
||||
"pageserver_layers_visited_per_read_global_count",
|
||||
"pageserver_layers_visited_per_read_global_bucket",
|
||||
"pageserver_layers_visited_per_vectored_read_global_sum",
|
||||
"pageserver_layers_visited_per_vectored_read_global_count",
|
||||
"pageserver_layers_visited_per_vectored_read_global_bucket",
|
||||
]
|
||||
|
||||
metrics = env.pageserver.http_client().get_metrics()
|
||||
for name in layer_access_metric_names:
|
||||
layer_access_metrics = metrics.query_all(name)
|
||||
log.info(f"Got metrics: {layer_access_metrics}")
|
||||
|
||||
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
|
||||
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
|
||||
non_vectored_average = non_vectored_sum / non_vectored_count
|
||||
|
||||
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
|
||||
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
|
||||
vectored_average = vectored_sum / vectored_count
|
||||
|
||||
log.info(f"{non_vectored_average=} {vectored_average=}")
|
||||
|
||||
# The upper bound for average number of layer visits below (8)
|
||||
# was chosen empirically for this workload.
|
||||
assert non_vectored_average < 8
|
||||
assert vectored_average < 8
|
||||
Reference in New Issue
Block a user