mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
10 Commits
readonly-n
...
initdb-cac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6862a803b1 | ||
|
|
f03f7b3868 | ||
|
|
ec5dce04eb | ||
|
|
6014f15157 | ||
|
|
e675a21346 | ||
|
|
6b93230270 | ||
|
|
797aa4ffaa | ||
|
|
c45b56e0bb | ||
|
|
3104f0f250 | ||
|
|
f2c08195f0 |
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -159,6 +159,10 @@ jobs:
|
||||
# This will catch compiler & clippy warnings in all feature combinations.
|
||||
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
|
||||
# NB: keep clippy args in sync with ./run_clippy.sh
|
||||
#
|
||||
# The only difference between "clippy --debug" and "clippy --release" is that in --release mode,
|
||||
# #[cfg(debug_assertions)] blocks are not built. It's not worth building everything for second
|
||||
# time just for that, so skip "clippy --release".
|
||||
- run: |
|
||||
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
|
||||
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
|
||||
@@ -168,8 +172,6 @@ jobs:
|
||||
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
|
||||
- name: Run cargo clippy (debug)
|
||||
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
- name: Run cargo clippy (release)
|
||||
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
|
||||
|
||||
- name: Check documentation generation
|
||||
run: cargo doc --workspace --no-deps --document-private-items
|
||||
|
||||
@@ -64,6 +64,7 @@ pub struct ConfigToml {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
@@ -322,6 +323,7 @@ impl Default for ConfigToml {
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
initdb_cache_dir: None,
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
|
||||
@@ -82,7 +82,7 @@ impl ApiError {
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
),
|
||||
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
format!("{err:#}"), // use alternative formatting so that we give the cause without backtrace
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
),
|
||||
}
|
||||
|
||||
@@ -21,7 +21,13 @@
|
||||
//!
|
||||
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
|
||||
|
||||
use std::{sync::Mutex, time::Duration};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Mutex,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::{sync::Notify, time::Instant};
|
||||
|
||||
@@ -128,6 +134,7 @@ impl LeakyBucketState {
|
||||
|
||||
pub struct RateLimiter {
|
||||
pub config: LeakyBucketConfig,
|
||||
pub sleep_counter: AtomicU64,
|
||||
pub state: Mutex<LeakyBucketState>,
|
||||
/// a queue to provide this fair ordering.
|
||||
pub queue: Notify,
|
||||
@@ -144,6 +151,7 @@ impl Drop for Requeue<'_> {
|
||||
impl RateLimiter {
|
||||
pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
|
||||
RateLimiter {
|
||||
sleep_counter: AtomicU64::new(0),
|
||||
state: Mutex::new(LeakyBucketState::with_initial_tokens(
|
||||
&config,
|
||||
initial_tokens,
|
||||
@@ -163,15 +171,16 @@ impl RateLimiter {
|
||||
|
||||
/// returns true if we did throttle
|
||||
pub async fn acquire(&self, count: usize) -> bool {
|
||||
let mut throttled = false;
|
||||
|
||||
let start = tokio::time::Instant::now();
|
||||
|
||||
let start_count = self.sleep_counter.load(Ordering::Acquire);
|
||||
let mut end_count = start_count;
|
||||
|
||||
// wait until we are the first in the queue
|
||||
let mut notified = std::pin::pin!(self.queue.notified());
|
||||
if !notified.as_mut().enable() {
|
||||
throttled = true;
|
||||
notified.await;
|
||||
end_count = self.sleep_counter.load(Ordering::Acquire);
|
||||
}
|
||||
|
||||
// notify the next waiter in the queue when we are done.
|
||||
@@ -184,9 +193,22 @@ impl RateLimiter {
|
||||
.unwrap()
|
||||
.add_tokens(&self.config, start, count as f64);
|
||||
match res {
|
||||
Ok(()) => return throttled,
|
||||
Ok(()) => return end_count > start_count,
|
||||
Err(ready_at) => {
|
||||
throttled = true;
|
||||
struct Increment<'a>(&'a AtomicU64);
|
||||
|
||||
impl Drop for Increment<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
// increment the counter after we finish sleeping (or cancel this task).
|
||||
// this ensures that tasks that have already started the acquire will observe
|
||||
// the new sleep count when they are allowed to resume on the notify.
|
||||
let _inc = Increment(&self.sleep_counter);
|
||||
end_count += 1;
|
||||
|
||||
tokio::time::sleep_until(ready_at).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +70,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub superuser: String,
|
||||
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
|
||||
@@ -297,6 +299,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
pg_distrib_dir,
|
||||
@@ -344,6 +347,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
|
||||
@@ -1177,10 +1177,10 @@ pub(crate) mod virtual_file_io_engine {
|
||||
}
|
||||
|
||||
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
global_metric: &'a Histogram,
|
||||
global_latency_histo: &'a Histogram,
|
||||
|
||||
// Optional because not all op types are tracked per-timeline
|
||||
timeline_metric: Option<&'a Histogram>,
|
||||
per_timeline_latency_histo: Option<&'a Histogram>,
|
||||
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
@@ -1212,9 +1212,10 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
elapsed
|
||||
}
|
||||
};
|
||||
self.global_metric.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(timeline_metric) = self.timeline_metric {
|
||||
timeline_metric.observe(ex_throttled.as_secs_f64());
|
||||
self.global_latency_histo
|
||||
.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
|
||||
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1240,10 +1241,32 @@ pub enum SmgrQueryType {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
global_metrics: [Histogram; SmgrQueryType::COUNT],
|
||||
per_timeline_getpage: Histogram,
|
||||
global_started: [IntCounter; SmgrQueryType::COUNT],
|
||||
global_latency: [Histogram; SmgrQueryType::COUNT],
|
||||
per_timeline_getpage_started: IntCounter,
|
||||
per_timeline_getpage_latency: Histogram,
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
"pageserver_smgr_query_started_global_count",
|
||||
"Number of smgr queries started, aggregated by query type.",
|
||||
&["smgr_query_type"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
"pageserver_smgr_query_started_count",
|
||||
"Number of smgr queries started, aggregated by query type and tenant/timeline.",
|
||||
&["smgr_query_type", "tenant_id", "shard_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_smgr_query_seconds",
|
||||
@@ -1319,14 +1342,20 @@ impl SmgrQueryTimePerTimeline {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_slug = format!("{}", tenant_shard_id.shard_slug());
|
||||
let timeline_id = timeline_id.to_string();
|
||||
let global_metrics = std::array::from_fn(|i| {
|
||||
let global_started = std::array::from_fn(|i| {
|
||||
let op = SmgrQueryType::from_repr(i).unwrap();
|
||||
SMGR_QUERY_STARTED_GLOBAL
|
||||
.get_metric_with_label_values(&[op.into()])
|
||||
.unwrap()
|
||||
});
|
||||
let global_latency = std::array::from_fn(|i| {
|
||||
let op = SmgrQueryType::from_repr(i).unwrap();
|
||||
SMGR_QUERY_TIME_GLOBAL
|
||||
.get_metric_with_label_values(&[op.into()])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let per_timeline_getpage = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
|
||||
let per_timeline_getpage_started = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE
|
||||
.get_metric_with_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
&tenant_id,
|
||||
@@ -1334,9 +1363,20 @@ impl SmgrQueryTimePerTimeline {
|
||||
&timeline_id,
|
||||
])
|
||||
.unwrap();
|
||||
let per_timeline_getpage_latency = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
|
||||
.get_metric_with_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
&tenant_id,
|
||||
&shard_slug,
|
||||
&timeline_id,
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
global_metrics,
|
||||
per_timeline_getpage,
|
||||
global_started,
|
||||
global_latency,
|
||||
per_timeline_getpage_latency,
|
||||
per_timeline_getpage_started,
|
||||
}
|
||||
}
|
||||
pub(crate) fn start_timer<'c: 'a, 'a>(
|
||||
@@ -1344,8 +1384,11 @@ impl SmgrQueryTimePerTimeline {
|
||||
op: SmgrQueryType,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + '_> {
|
||||
let global_metric = &self.global_metrics[op as usize];
|
||||
let start = Instant::now();
|
||||
|
||||
self.global_started[op as usize].inc();
|
||||
|
||||
// We subtract time spent throttled from the observed latency.
|
||||
match ctx.micros_spent_throttled.open() {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
@@ -1364,15 +1407,16 @@ impl SmgrQueryTimePerTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
let timeline_metric = if matches!(op, SmgrQueryType::GetPageAtLsn) {
|
||||
Some(&self.per_timeline_getpage)
|
||||
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
|
||||
self.per_timeline_getpage_started.inc();
|
||||
Some(&self.per_timeline_getpage_latency)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Some(GlobalAndPerTimelineHistogramTimer {
|
||||
global_metric,
|
||||
timeline_metric,
|
||||
global_latency_histo: &self.global_latency[op as usize],
|
||||
per_timeline_latency_histo,
|
||||
ctx,
|
||||
start,
|
||||
op,
|
||||
@@ -1423,9 +1467,12 @@ mod smgr_query_time_tests {
|
||||
let get_counts = || {
|
||||
let global: u64 = ops
|
||||
.iter()
|
||||
.map(|op| metrics.global_metrics[*op as usize].get_sample_count())
|
||||
.map(|op| metrics.global_latency[*op as usize].get_sample_count())
|
||||
.sum();
|
||||
(global, metrics.per_timeline_getpage.get_sample_count())
|
||||
(
|
||||
global,
|
||||
metrics.per_timeline_getpage_latency.get_sample_count(),
|
||||
)
|
||||
};
|
||||
|
||||
let (pre_global, pre_per_tenant_timeline) = get_counts();
|
||||
@@ -2576,6 +2623,12 @@ impl TimelineMetrics {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
|
||||
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
tenant_id,
|
||||
@@ -2592,6 +2645,8 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
}
|
||||
|
||||
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
|
||||
|
||||
// we leave the BROKEN_TENANTS_SET entry if any
|
||||
}
|
||||
|
||||
@@ -3055,41 +3110,180 @@ pub mod tokio_epoll_uring {
|
||||
pub(crate) mod tenant_throttling {
|
||||
use metrics::{register_int_counter_vec, IntCounter};
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::tenant::{self, throttle::Metric};
|
||||
|
||||
pub(crate) struct TimelineGet {
|
||||
wait_time: IntCounter,
|
||||
count: IntCounter,
|
||||
struct GlobalAndPerTenantIntCounter {
|
||||
global: IntCounter,
|
||||
per_tenant: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static TIMELINE_GET: Lazy<TimelineGet> = Lazy::new(|| {
|
||||
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum_global",
|
||||
"Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.",
|
||||
impl GlobalAndPerTenantIntCounter {
|
||||
#[inline(always)]
|
||||
pub(crate) fn inc(&self) {
|
||||
self.inc_by(1)
|
||||
}
|
||||
#[inline(always)]
|
||||
pub(crate) fn inc_by(&self, n: u64) {
|
||||
self.global.inc_by(n);
|
||||
self.per_tenant.inc_by(n);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TimelineGet {
|
||||
count_accounted_start: GlobalAndPerTenantIntCounter,
|
||||
count_accounted_finish: GlobalAndPerTenantIntCounter,
|
||||
wait_time: GlobalAndPerTenantIntCounter,
|
||||
count_throttled: GlobalAndPerTenantIntCounter,
|
||||
}
|
||||
|
||||
static COUNT_ACCOUNTED_START: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_start_global",
|
||||
"Count of tenant throttling starts, by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_global",
|
||||
"Count of tenant throttlings, by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let kind = "timeline_get";
|
||||
TimelineGet {
|
||||
wait_time: WAIT_USECS.with_label_values(&[kind]),
|
||||
count: WAIT_COUNT.with_label_values(&[kind]),
|
||||
}
|
||||
.unwrap()
|
||||
});
|
||||
static COUNT_ACCOUNTED_START_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_start",
|
||||
"Count of tenant throttling starts, by kind of throttle.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
static COUNT_ACCOUNTED_FINISH: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_finish_global",
|
||||
"Count of tenant throttling finishes, by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
static COUNT_ACCOUNTED_FINISH_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_finish",
|
||||
"Count of tenant throttling finishes, by kind of throttle.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum_global",
|
||||
"Sum of microseconds that spent waiting throttle by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
static WAIT_USECS_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum",
|
||||
"Sum of microseconds that spent waiting throttle by kind of throttle.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
impl Metric for &'static TimelineGet {
|
||||
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_global",
|
||||
"Count of tenant throttlings, by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
static WAIT_COUNT_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count",
|
||||
"Count of tenant throttlings, by kind of throttle.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
const KIND: &str = "timeline_get";
|
||||
|
||||
impl TimelineGet {
|
||||
pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self {
|
||||
TimelineGet {
|
||||
count_accounted_start: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]),
|
||||
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
}
|
||||
},
|
||||
count_accounted_finish: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]),
|
||||
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
}
|
||||
},
|
||||
wait_time: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: WAIT_USECS.with_label_values(&[KIND]),
|
||||
per_tenant: WAIT_USECS_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
}
|
||||
},
|
||||
count_throttled: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: WAIT_COUNT.with_label_values(&[KIND]),
|
||||
per_tenant: WAIT_COUNT_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preinitialize_global_metrics() {
|
||||
Lazy::force(&COUNT_ACCOUNTED_START);
|
||||
Lazy::force(&COUNT_ACCOUNTED_FINISH);
|
||||
Lazy::force(&WAIT_USECS);
|
||||
Lazy::force(&WAIT_COUNT);
|
||||
}
|
||||
|
||||
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
for m in &[
|
||||
&COUNT_ACCOUNTED_START_PER_TENANT,
|
||||
&COUNT_ACCOUNTED_FINISH_PER_TENANT,
|
||||
&WAIT_USECS_PER_TENANT,
|
||||
&WAIT_COUNT_PER_TENANT,
|
||||
] {
|
||||
let _ = m.remove_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
impl Metric for TimelineGet {
|
||||
#[inline(always)]
|
||||
fn accounting_start(&self) {
|
||||
self.count_accounted_start.inc();
|
||||
}
|
||||
#[inline(always)]
|
||||
fn accounting_finish(&self) {
|
||||
self.count_accounted_finish.inc();
|
||||
}
|
||||
#[inline(always)]
|
||||
fn observe_throttling(
|
||||
&self,
|
||||
@@ -3097,7 +3291,7 @@ pub(crate) mod tenant_throttling {
|
||||
) {
|
||||
let val = u64::try_from(wait_time.as_micros()).unwrap();
|
||||
self.wait_time.inc_by(val);
|
||||
self.count.inc();
|
||||
self.count_throttled.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3227,11 +3421,14 @@ pub fn preinitialize_metrics() {
|
||||
}
|
||||
|
||||
// countervecs
|
||||
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
|
||||
.into_iter()
|
||||
.for_each(|c| {
|
||||
Lazy::force(c);
|
||||
});
|
||||
[
|
||||
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
|
||||
&SMGR_QUERY_STARTED_GLOBAL,
|
||||
]
|
||||
.into_iter()
|
||||
.for_each(|c| {
|
||||
Lazy::force(c);
|
||||
});
|
||||
|
||||
// gauges
|
||||
WALRECEIVER_ACTIVE_MANAGERS.get();
|
||||
@@ -3253,7 +3450,8 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// Custom
|
||||
Lazy::force(&RECONSTRUCT_TIME);
|
||||
Lazy::force(&tenant_throttling::TIMELINE_GET);
|
||||
Lazy::force(&BASEBACKUP_QUERY_TIME);
|
||||
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
|
||||
|
||||
tenant_throttling::preinitialize_global_metrics();
|
||||
}
|
||||
|
||||
@@ -302,7 +302,7 @@ pub struct Tenant {
|
||||
/// Throttle applied at the top of [`Timeline::get`].
|
||||
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
|
||||
pub(crate) timeline_get_throttle:
|
||||
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
Arc<throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
|
||||
/// An ongoing timeline detach concurrency limiter.
|
||||
///
|
||||
@@ -2831,7 +2831,7 @@ impl Tenant {
|
||||
gate: Gate::default(),
|
||||
timeline_get_throttle: Arc::new(throttle::Throttle::new(
|
||||
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
crate::metrics::tenant_throttling::TimelineGet::new(&tenant_shard_id),
|
||||
)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
@@ -3522,7 +3522,7 @@ impl Tenant {
|
||||
.context("extract initdb tar")?;
|
||||
} else {
|
||||
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
run_initdb_with_cache(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if self.tenant_shard_id().is_shard_zero() {
|
||||
@@ -3868,6 +3868,118 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn cached_initdb_dirname(initial_superuser_name: &str, pg_version: u32) -> String
|
||||
{
|
||||
use std::hash::Hash;
|
||||
use std::hash::Hasher;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
let mut hasher = DefaultHasher::new();
|
||||
initial_superuser_name.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
|
||||
format!("cached_initial_pgdata_{pg_version}_{:016}", hash)
|
||||
}
|
||||
|
||||
fn copy_dir_all(src: impl AsRef<std::path::Path>, dst: impl AsRef<std::path::Path>) -> std::io::Result<()> {
|
||||
for entry in fs::read_dir(src.as_ref())? {
|
||||
let entry = entry?;
|
||||
let subsrc = entry.path();
|
||||
let subdst = dst.as_ref().join(&entry.file_name());
|
||||
|
||||
if entry.file_type()?.is_dir() {
|
||||
std::fs::create_dir(&subdst)?;
|
||||
copy_dir_all(&subsrc, &subdst)?;
|
||||
} else {
|
||||
std::fs::copy(&subsrc, &subdst)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_cached_initdb_dir(
|
||||
cached_path: &Utf8Path,
|
||||
target_path: &Utf8Path,
|
||||
) -> anyhow::Result<bool> {
|
||||
if !cached_path.exists() {
|
||||
info!("cached initdb dir \"{cached_path}\" does not exist yet");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
std::fs::create_dir(target_path)?;
|
||||
copy_dir_all(cached_path, target_path)?;
|
||||
info!("restored initdb result from cache dir \"{cached_path}\"");
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn save_cached_initdb_dir(
|
||||
src_path: &Utf8Path,
|
||||
cache_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
match std::fs::create_dir(cache_path) {
|
||||
Ok(()) => {
|
||||
info!("saving initdb result to cache dir \"{cache_path}\"");
|
||||
},
|
||||
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
info!("cache initdb dir \"{cache_path}\" already exists, not saving");
|
||||
return Ok(())
|
||||
},
|
||||
Err(err) => { return Err(anyhow::Error::from(err))},
|
||||
};
|
||||
let cache_dir_guard = scopeguard::guard(cache_path, |cp| {
|
||||
if let Err(err) = std::fs::remove_dir_all(&cp) {
|
||||
error!("could not remove cached initdb directory {cp}: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
let cache_parent_path = cache_path.parent().ok_or(anyhow::Error::msg("no cache parent path"))?;
|
||||
|
||||
let tmp_dirpath = camino_tempfile::tempdir_in(cache_parent_path)?;
|
||||
copy_dir_all(src_path, &tmp_dirpath)?;
|
||||
std::fs::rename(tmp_dirpath, &*cache_dir_guard)?;
|
||||
|
||||
// disarm the guard
|
||||
scopeguard::ScopeGuard::into_inner(cache_dir_guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_initdb_with_cache(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Utf8Path,
|
||||
pg_version: u32,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), InitdbError> {
|
||||
|
||||
let cache_dir = conf.initdb_cache_dir.as_ref().map(|initdb_cache_dir| {
|
||||
initdb_cache_dir.join(cached_initdb_dirname(&conf.superuser, pg_version))
|
||||
});
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
match restore_cached_initdb_dir(&cache_dir, initdb_target_dir) {
|
||||
Ok(true) => return Ok(()),
|
||||
Ok(false) => {},
|
||||
Err(err) => {
|
||||
warn!("Error restoring from cached initdb directory \"{cache_dir}\": {err}");
|
||||
if initdb_target_dir.exists() {
|
||||
if let Err(err) = std::fs::remove_dir_all(&initdb_target_dir) {
|
||||
error!("could not remove temporary initdb target directory {initdb_target_dir}: {err}");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
run_initdb(conf, initdb_target_dir, pg_version, cancel).await?;
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
if let Err(err) = save_cached_initdb_dir(initdb_target_dir, &cache_dir) {
|
||||
warn!("error saving initdb result to cache directory \"{cache_dir}\": {err}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
|
||||
@@ -439,11 +439,30 @@ impl Layer {
|
||||
|
||||
fn record_access(&self, ctx: &RequestContext) {
|
||||
if self.0.access_stats.record_access(ctx) {
|
||||
// Visibility was modified to Visible
|
||||
tracing::info!(
|
||||
"Layer {} became visible as a result of access",
|
||||
self.0.desc.key()
|
||||
);
|
||||
// Visibility was modified to Visible: maybe log about this
|
||||
match ctx.task_kind() {
|
||||
TaskKind::CalculateSyntheticSize
|
||||
| TaskKind::GarbageCollector
|
||||
| TaskKind::MgmtRequest => {
|
||||
// This situation is expected in code paths do binary searches of the LSN space to resolve
|
||||
// an LSN to a timestamp, which happens during GC, during GC cutoff calculations in synthetic size,
|
||||
// and on-demand for certain HTTP API requests.
|
||||
}
|
||||
_ => {
|
||||
// In all other contexts, it is unusual to do I/O involving layers which are not visible at
|
||||
// some branch tip, so we log the fact that we are accessing something that the visibility
|
||||
// calculation thought should not be visible.
|
||||
//
|
||||
// This case is legal in brief time windows: for example an in-flight getpage request can hold on to a layer object
|
||||
// which was covered by a concurrent compaction.
|
||||
tracing::info!(
|
||||
"Layer {} became visible as a result of access",
|
||||
self.0.desc.key()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Update the timeline's visible bytes count
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
|
||||
@@ -1025,6 +1025,15 @@ fn access_stats() {
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(atime));
|
||||
access_stats.set_visibility(LayerVisibilityHint::Visible);
|
||||
assert_eq!(access_stats.latest_activity(), lowres_time(atime));
|
||||
|
||||
// Recording access implicitly makes layer visible, if it wasn't already
|
||||
let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
|
||||
access_stats.set_visibility(LayerVisibilityHint::Covered);
|
||||
assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
|
||||
assert!(access_stats.record_access_at(atime));
|
||||
access_stats.set_visibility(LayerVisibilityHint::Visible);
|
||||
assert!(!access_stats.record_access_at(atime));
|
||||
access_stats.set_visibility(LayerVisibilityHint::Visible);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -163,8 +163,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
// How many errors we have seen consequtively
|
||||
let mut error_run_count = 0;
|
||||
|
||||
let mut last_throttle_flag_reset_at = Instant::now();
|
||||
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
@@ -191,8 +189,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
let sleep_duration;
|
||||
if period == Duration::ZERO {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
@@ -207,12 +203,18 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
};
|
||||
|
||||
// Run compaction
|
||||
let IterationResult { output, elapsed } = iteration.run(tenant.compaction_iteration(&cancel, &ctx)).await;
|
||||
let IterationResult { output, elapsed } = iteration
|
||||
.run(tenant.compaction_iteration(&cancel, &ctx))
|
||||
.await;
|
||||
match output {
|
||||
Ok(has_pending_task) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
sleep_duration = if has_pending_task { Duration::ZERO } else { period };
|
||||
sleep_duration = if has_pending_task {
|
||||
Duration::ZERO
|
||||
} else {
|
||||
period
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
@@ -233,38 +235,20 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
}
|
||||
|
||||
// the duration is recorded by performance tests by enabling debug in this function
|
||||
tracing::debug!(elapsed_ms=elapsed.as_millis(), "compaction iteration complete");
|
||||
tracing::debug!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"compaction iteration complete"
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
|
||||
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
|
||||
if let Some(walredo_mgr) = &tenant.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(period * 10);
|
||||
}
|
||||
|
||||
// TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
|
||||
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
|
||||
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
|
||||
let now = Instant::now();
|
||||
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
|
||||
let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
|
||||
if count_throttled == 0 {
|
||||
return;
|
||||
}
|
||||
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
info!(
|
||||
n_seconds=%format_args!("{:.3}",
|
||||
delta.as_secs_f64()),
|
||||
count_accounted,
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds"
|
||||
);
|
||||
});
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
@@ -437,6 +421,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let mut last_throttle_flag_reset_at = Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -483,6 +468,29 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
|
||||
kind: BackgroundLoopKind::IngestHouseKeeping,
|
||||
};
|
||||
iteration.run(tenant.ingest_housekeeping()).await;
|
||||
|
||||
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
|
||||
// Or just spawn another background loop for this throttle, it's not like it's super costly.
|
||||
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
|
||||
let now = Instant::now();
|
||||
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
|
||||
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.timeline_get_throttle.reset_stats();
|
||||
if count_throttled == 0 {
|
||||
return;
|
||||
}
|
||||
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
info!(
|
||||
n_seconds=%format_args!("{:.3}",
|
||||
delta.as_secs_f64()),
|
||||
count_accounted = count_accounted_finish, // don't break existing log scraping
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
count_accounted_start, // log after pre-existing fields to not break existing log scraping
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
.await;
|
||||
|
||||
@@ -24,8 +24,10 @@ use crate::{context::RequestContext, task_mgr::TaskKind};
|
||||
pub struct Throttle<M: Metric> {
|
||||
inner: ArcSwap<Inner>,
|
||||
metric: M,
|
||||
/// will be turned into [`Stats::count_accounted`]
|
||||
count_accounted: AtomicU64,
|
||||
/// will be turned into [`Stats::count_accounted_start`]
|
||||
count_accounted_start: AtomicU64,
|
||||
/// will be turned into [`Stats::count_accounted_finish`]
|
||||
count_accounted_finish: AtomicU64,
|
||||
/// will be turned into [`Stats::count_throttled`]
|
||||
count_throttled: AtomicU64,
|
||||
/// will be turned into [`Stats::sum_throttled_usecs`]
|
||||
@@ -43,17 +45,21 @@ pub struct Observation {
|
||||
pub wait_time: Duration,
|
||||
}
|
||||
pub trait Metric {
|
||||
fn accounting_start(&self);
|
||||
fn accounting_finish(&self);
|
||||
fn observe_throttling(&self, observation: &Observation);
|
||||
}
|
||||
|
||||
/// See [`Throttle::reset_stats`].
|
||||
pub struct Stats {
|
||||
// Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
|
||||
pub count_accounted: u64,
|
||||
// Subset of the `accounted` requests that were actually throttled.
|
||||
// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
|
||||
/// Number of requests that started [`Throttle::throttle`] calls.
|
||||
pub count_accounted_start: u64,
|
||||
/// Number of requests that finished [`Throttle::throttle`] calls.
|
||||
pub count_accounted_finish: u64,
|
||||
/// Subset of the `accounted` requests that were actually throttled.
|
||||
/// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
|
||||
pub count_throttled: u64,
|
||||
// Sum of microseconds that throttled requests spent waiting for throttling.
|
||||
/// Sum of microseconds that throttled requests spent waiting for throttling.
|
||||
pub sum_throttled_usecs: u64,
|
||||
}
|
||||
|
||||
@@ -65,7 +71,8 @@ where
|
||||
Self {
|
||||
inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
|
||||
metric,
|
||||
count_accounted: AtomicU64::new(0),
|
||||
count_accounted_start: AtomicU64::new(0),
|
||||
count_accounted_finish: AtomicU64::new(0),
|
||||
count_throttled: AtomicU64::new(0),
|
||||
sum_throttled_usecs: AtomicU64::new(0),
|
||||
}
|
||||
@@ -117,11 +124,13 @@ where
|
||||
/// This method allows retrieving & resetting that flag.
|
||||
/// Useful for periodic reporting.
|
||||
pub fn reset_stats(&self) -> Stats {
|
||||
let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
|
||||
let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
|
||||
let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
|
||||
let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
|
||||
let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
|
||||
Stats {
|
||||
count_accounted,
|
||||
count_accounted_start,
|
||||
count_accounted_finish,
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
}
|
||||
@@ -139,9 +148,12 @@ where
|
||||
};
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
self.metric.accounting_start();
|
||||
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
|
||||
let did_throttle = inner.rate_limiter.acquire(key_count).await;
|
||||
self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
|
||||
self.metric.accounting_finish();
|
||||
|
||||
self.count_accounted.fetch_add(1, Ordering::Relaxed);
|
||||
if did_throttle {
|
||||
self.count_throttled.fetch_add(1, Ordering::Relaxed);
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -196,9 +196,8 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
pub timeline_get_throttle:
|
||||
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -406,9 +405,8 @@ pub struct Timeline {
|
||||
gc_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
|
||||
timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
timeline_get_throttle:
|
||||
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
|
||||
/// Keep aux directory cache to avoid it's reconstruction on each update
|
||||
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
|
||||
@@ -4316,7 +4314,9 @@ impl Timeline {
|
||||
timer.stop_and_record();
|
||||
|
||||
// Creating image layers may have caused some previously visible layers to be covered
|
||||
self.update_layer_visibility().await?;
|
||||
if !image_layers.is_empty() {
|
||||
self.update_layer_visibility().await?;
|
||||
}
|
||||
|
||||
Ok(image_layers)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
openapi: "3.0.2"
|
||||
info:
|
||||
title: Safekeeper control API
|
||||
description: Neon Safekeeper API
|
||||
version: "1.0"
|
||||
license:
|
||||
name: "Apache"
|
||||
url: https://github.com/neondatabase/neon/blob/main/LICENSE
|
||||
|
||||
|
||||
servers:
|
||||
@@ -386,6 +390,12 @@ components:
|
||||
msg:
|
||||
type: string
|
||||
|
||||
NotFoundError:
|
||||
type: object
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
|
||||
responses:
|
||||
|
||||
#
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
error::Error,
|
||||
ops::Deref,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
@@ -218,9 +219,16 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
format!("{node} error receiving error body: {str}").into(),
|
||||
)
|
||||
}
|
||||
mgmt_api::Error::ReceiveBody(str) => {
|
||||
// Presume errors receiving body are connectivity/availability issues
|
||||
ApiError::ResourceUnavailable(format!("{node} error receiving body: {str}").into())
|
||||
mgmt_api::Error::ReceiveBody(err) if err.is_decode() => {
|
||||
// Return 500 for decoding errors.
|
||||
ApiError::InternalServerError(anyhow::Error::from(err).context("error decoding body"))
|
||||
}
|
||||
mgmt_api::Error::ReceiveBody(err) => {
|
||||
// Presume errors receiving body are connectivity/availability issues except for decoding errors
|
||||
let src_str = err.source().map(|e| e.to_string()).unwrap_or_default();
|
||||
ApiError::ResourceUnavailable(
|
||||
format!("{node} error receiving error body: {err} {}", src_str).into(),
|
||||
)
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, msg) => {
|
||||
ApiError::NotFound(anyhow::anyhow!(format!("{node}: {msg}")).into())
|
||||
|
||||
@@ -102,6 +102,11 @@ def histogram(prefix_without_trailing_underscore: str) -> List[str]:
|
||||
return [f"{prefix_without_trailing_underscore}_{x}" for x in ["bucket", "count", "sum"]]
|
||||
|
||||
|
||||
def counter(name: str) -> str:
|
||||
# the prometheus_client package appends _total to all counters client-side
|
||||
return f"{name}_total"
|
||||
|
||||
|
||||
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_remote_timeline_client_calls_started_total",
|
||||
"pageserver_remote_timeline_client_calls_finished_total",
|
||||
@@ -132,9 +137,14 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
*histogram("pageserver_io_operations_seconds"),
|
||||
"pageserver_smgr_query_started_global_count_total",
|
||||
"pageserver_tenant_states_count",
|
||||
"pageserver_circuit_breaker_broken_total",
|
||||
"pageserver_circuit_breaker_unbroken_total",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start_global"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
|
||||
counter("pageserver_tenant_throttling_count_global"),
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
@@ -146,6 +156,7 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
"pageserver_smgr_query_started_count_total",
|
||||
"pageserver_archive_size",
|
||||
"pageserver_pitr_history_size",
|
||||
"pageserver_layer_bytes",
|
||||
@@ -157,6 +168,10 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
"pageserver_aux_file_estimated_size",
|
||||
"pageserver_valid_lsn_lease_count",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
counter("pageserver_tenant_throttling_count"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
# "pageserver_broken_tenants_count" -- used only for broken
|
||||
|
||||
@@ -181,6 +181,17 @@ def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def shared_initdb_cache_dir(top_output_dir: Path) -> Iterator[Path]:
|
||||
log.info("Creating shared initdb cache directory")
|
||||
|
||||
cache_dir = top_output_dir / "shared_initdb_cache"
|
||||
|
||||
shutil.rmtree(cache_dir, ignore_errors=True)
|
||||
cache_dir.mkdir(exist_ok=True)
|
||||
|
||||
yield cache_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
@@ -401,6 +412,7 @@ class NeonEnvBuilder:
|
||||
safekeeper_extra_opts: Optional[list[str]] = None,
|
||||
storage_controller_port_override: Optional[int] = None,
|
||||
pageserver_io_buffer_alignment: Optional[int] = None,
|
||||
shared_initdb_cache_dir: Optional[Path] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -432,6 +444,7 @@ class NeonEnvBuilder:
|
||||
self.enable_scrub_on_exit = True
|
||||
self.test_output_dir = test_output_dir
|
||||
self.test_overlay_dir = test_overlay_dir
|
||||
self.shared_initdb_cache_dir = shared_initdb_cache_dir
|
||||
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
|
||||
self.config_init_force: Optional[str] = None
|
||||
self.top_output_dir = top_output_dir
|
||||
@@ -969,6 +982,7 @@ class NeonEnv:
|
||||
|
||||
def __init__(self, config: NeonEnvBuilder):
|
||||
self.repo_dir = config.repo_dir
|
||||
self.shared_initdb_cache_dir = config.shared_initdb_cache_dir
|
||||
self.rust_log_override = config.rust_log_override
|
||||
self.port_distributor = config.port_distributor
|
||||
self.s3_mock_server = config.mock_s3_server
|
||||
@@ -1071,6 +1085,10 @@ class NeonEnv:
|
||||
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
|
||||
"availability_zone": "us-east-2a",
|
||||
}
|
||||
|
||||
if self.shared_initdb_cache_dir is not None:
|
||||
ps_cfg["initdb_cache_dir"] = str(self.shared_initdb_cache_dir)
|
||||
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
|
||||
@@ -1333,6 +1351,7 @@ def neon_simple_env(
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
shared_initdb_cache_dir: Optional[Path],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
@@ -1359,6 +1378,7 @@ def neon_simple_env(
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared_initdb_cache_dir=shared_initdb_cache_dir
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
@@ -1383,6 +1403,7 @@ def neon_env_builder(
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
record_property: Callable[[str, object], None],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
shared_initdb_cache_dir: Optional[Path],
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
"""
|
||||
Fixture to create a Neon environment for test.
|
||||
@@ -1418,6 +1439,7 @@ def neon_env_builder(
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared_initdb_cache_dir=shared_initdb_cache_dir
|
||||
) as builder:
|
||||
yield builder
|
||||
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a317b9b5b9...f9c51c1243
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6f6d77fb59...1dbd6f3164
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 0baa7346df...d009084a74
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 9156d63ce2...dadd6fe208
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17rc1",
|
||||
"9156d63ce253bed9d1f76355ceec610e444eaffa"
|
||||
"dadd6fe208bb906cc0a48980f2ab4e13c47ba3ad"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
|
||||
"d009084a745cb4d5e6de222c778b2a562c8b2767"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
|
||||
"1dbd6f316416c8360bbd4f3d6db956cf70937cf0"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"a317b9b5b96978b49e78986697f3dd80d06f99a7"
|
||||
"f9c51c12438b20049b6905eb4e43d321defd6ff2"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user