mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Merge branch 'main' into khanova-test
This commit is contained in:
@@ -256,7 +256,16 @@ fn update_rusage_metrics() {
|
||||
DISK_IO_BYTES
|
||||
.with_label_values(&["write"])
|
||||
.set(rusage_stats.ru_oublock * BYTES_IN_BLOCK);
|
||||
MAXRSS_KB.set(rusage_stats.ru_maxrss);
|
||||
|
||||
// On macOS, the unit of maxrss is bytes; on Linux, it's kilobytes. https://stackoverflow.com/a/59915669
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
MAXRSS_KB.set(rusage_stats.ru_maxrss / 1024);
|
||||
}
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
MAXRSS_KB.set(rusage_stats.ru_maxrss);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_rusage_stats() -> libc::rusage {
|
||||
|
||||
@@ -162,6 +162,10 @@ impl KeySpace {
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.total_size() == 0
|
||||
}
|
||||
|
||||
fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => None,
|
||||
|
||||
@@ -105,31 +105,39 @@ pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
#[derive(
|
||||
Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr,
|
||||
)]
|
||||
pub(crate) enum GetKind {
|
||||
Singular,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
pub(crate) struct ReconstructTimeMetrics {
|
||||
ok: Histogram,
|
||||
err: Histogram,
|
||||
singular: Histogram,
|
||||
vectored: Histogram,
|
||||
}
|
||||
|
||||
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
|
||||
let inner = register_histogram_vec!(
|
||||
"pageserver_getpage_reconstruct_seconds",
|
||||
"Time spent in reconstruct_value (reconstruct a page from deltas)",
|
||||
&["result"],
|
||||
&["get_kind"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
ReconstructTimeMetrics {
|
||||
ok: inner.get_metric_with_label_values(&["ok"]).unwrap(),
|
||||
err: inner.get_metric_with_label_values(&["err"]).unwrap(),
|
||||
singular: inner.with_label_values(&[GetKind::Singular.into()]),
|
||||
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
|
||||
}
|
||||
});
|
||||
|
||||
impl ReconstructTimeMetrics {
|
||||
pub(crate) fn for_result<T, E>(&self, result: &Result<T, E>) -> &Histogram {
|
||||
match result {
|
||||
Ok(_) => &self.ok,
|
||||
Err(_) => &self.err,
|
||||
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
|
||||
match get_kind {
|
||||
GetKind::Singular => &self.singular,
|
||||
GetKind::Vectored => &self.vectored,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -142,13 +150,33 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
pub(crate) struct ReconstructDataTimeMetrics {
|
||||
singular: Histogram,
|
||||
vectored: Histogram,
|
||||
}
|
||||
|
||||
impl ReconstructDataTimeMetrics {
|
||||
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
|
||||
match get_kind {
|
||||
GetKind::Singular => &self.singular,
|
||||
GetKind::Vectored => &self.vectored,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> = Lazy::new(|| {
|
||||
let inner = register_histogram_vec!(
|
||||
"pageserver_getpage_get_reconstruct_data_seconds",
|
||||
"Time spent in get_reconstruct_value_data",
|
||||
&["get_kind"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
.expect("failed to define a metric");
|
||||
|
||||
ReconstructDataTimeMetrics {
|
||||
singular: inner.with_label_values(&[GetKind::Singular.into()]),
|
||||
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
|
||||
@@ -86,7 +86,7 @@ use crate::{
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::{
|
||||
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
GetKind, TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
};
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
@@ -797,7 +797,9 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(GetKind::Singular)
|
||||
.start_timer();
|
||||
let path = self
|
||||
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
@@ -807,7 +809,7 @@ impl Timeline {
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
let elapsed = start.elapsed();
|
||||
crate::metrics::RECONSTRUCT_TIME
|
||||
.for_result(&res)
|
||||
.for_get_kind(GetKind::Singular)
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if cfg!(feature = "testing") && res.is_err() {
|
||||
@@ -969,9 +971,22 @@ impl Timeline {
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
let get_kind = if keyspace.total_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
GetKind::Vectored
|
||||
};
|
||||
|
||||
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
get_data_timer.stop_and_record();
|
||||
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
for (key, res) in reconstruct_state.keys {
|
||||
@@ -987,6 +1002,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
// Note that this is an approximation. Tracking the exact number of layers visited
|
||||
// per key requires virtually unbounded memory usage and is inefficient
|
||||
@@ -3127,55 +3143,61 @@ impl Timeline {
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
completed_keyspace.merge(&keys_done_last_step);
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
// Do not descent any further if the last layer we visited
|
||||
// completed all keys in the keyspace it inspected. This is not
|
||||
// required for correctness, but avoids visiting extra layers
|
||||
// which turns out to be a perf bottleneck in some cases.
|
||||
if !unmapped_keyspace.is_empty() {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
}
|
||||
|
||||
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
let next_cont_lsn = lsn_range.start;
|
||||
|
||||
@@ -403,7 +403,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
|
||||
client_tasks.spawn(usage_metrics::task_backup(
|
||||
&metrics_config.backup_metric_collection_config,
|
||||
cancellation_token,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -423,7 +423,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cache = api.caches.endpoints_cache.clone();
|
||||
let con = regional_redis_client;
|
||||
let span = tracing::info_span!("endpoints_cache");
|
||||
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
|
||||
maintenance_tasks.spawn(
|
||||
async move { cache.do_read(con, cancellation_token.clone()).await }
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
12
proxy/src/cache/endpoints.rs
vendored
12
proxy/src/cache/endpoints.rs
vendored
@@ -4,6 +4,7 @@ use std::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use dashmap::DashSet;
|
||||
@@ -13,6 +14,7 @@ use redis::{
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
@@ -111,16 +113,22 @@ impl EndpointsCache {
|
||||
pub async fn do_read(
|
||||
&self,
|
||||
mut con: ConnectionWithCredentialsProvider,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<Infallible> {
|
||||
let mut last_id = "0-0".to_string();
|
||||
loop {
|
||||
self.ready.store(false, Ordering::Release);
|
||||
if let Err(e) = con.connect().await {
|
||||
tracing::error!("error connecting to redis: {:?}", e);
|
||||
continue;
|
||||
self.ready.store(false, Ordering::Release);
|
||||
}
|
||||
if let Err(e) = self.read_from_stream(&mut con, &mut last_id).await {
|
||||
tracing::error!("error reading from redis: {:?}", e);
|
||||
self.ready.store(false, Ordering::Release);
|
||||
}
|
||||
if cancellation_token.is_cancelled() {
|
||||
info!("cancellation token is cancelled, exiting");
|
||||
tokio::time::sleep(Duration::from_secs(60 * 60 * 24 * 7)).await;
|
||||
// 1 week.
|
||||
}
|
||||
tokio::time::sleep(self.config.retry_interval).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user