mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Compare commits
24 Commits
skyzh/page
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1e6a3e8b9 | ||
|
|
be3c261715 | ||
|
|
42cd6f7bee | ||
|
|
f457cef8d4 | ||
|
|
d763caa3a9 | ||
|
|
4d99c10c5e | ||
|
|
ce1e575db1 | ||
|
|
a12369be43 | ||
|
|
6d77432ed2 | ||
|
|
2a5b0d1b99 | ||
|
|
b811ae4fe5 | ||
|
|
0c6defd8da | ||
|
|
9584f65950 | ||
|
|
ef81d0b81d | ||
|
|
e019b82d87 | ||
|
|
cfe9a8ad11 | ||
|
|
f72a1505e6 | ||
|
|
4ba997c3e5 | ||
|
|
1882674a8a | ||
|
|
2033aeead1 | ||
|
|
d84c534922 | ||
|
|
fea8c98b59 | ||
|
|
eba08ab0a8 | ||
|
|
ccf32412eb |
@@ -76,14 +76,7 @@ pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
|
||||
mfs
|
||||
}
|
||||
|
||||
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"libmetrics_disk_io_bytes_total",
|
||||
"Bytes written and read from disk, grouped by the operation (read|write)",
|
||||
&["io_operation"]
|
||||
)
|
||||
.expect("Failed to register disk i/o bytes int gauge vec")
|
||||
});
|
||||
|
||||
|
||||
static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
@@ -261,12 +254,7 @@ const BYTES_IN_BLOCK: i64 = 512;
|
||||
fn update_rusage_metrics() {
|
||||
let rusage_stats = get_rusage_stats();
|
||||
|
||||
DISK_IO_BYTES
|
||||
.with_label_values(&["read"])
|
||||
.set(rusage_stats.ru_inblock * BYTES_IN_BLOCK);
|
||||
DISK_IO_BYTES
|
||||
.with_label_values(&["write"])
|
||||
.set(rusage_stats.ru_oublock * BYTES_IN_BLOCK);
|
||||
|
||||
|
||||
// On macOS, the unit of maxrss is bytes; on Linux, it's kilobytes. https://stackoverflow.com/a/59915669
|
||||
#[cfg(target_os = "macos")]
|
||||
@@ -357,10 +345,7 @@ impl<P: Atomic> GenericCounterPairVec<P> {
|
||||
self.get_metric_with_label_values(vals).unwrap()
|
||||
}
|
||||
|
||||
pub fn remove_label_values(&self, res: &mut [prometheus::Result<()>; 2], vals: &[&str]) {
|
||||
res[0] = self.inc.remove_label_values(vals);
|
||||
res[1] = self.dec.remove_label_values(vals);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<P: Atomic> GenericCounterPair<P> {
|
||||
|
||||
@@ -23,7 +23,6 @@ use futures::future::Either;
|
||||
use futures::stream::Stream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use http_types::{StatusCode, Url};
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
use utils::backoff;
|
||||
@@ -32,7 +31,7 @@ use utils::backoff::exponential_backoff_duration_seconds;
|
||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use crate::config::AzureConfig;
|
||||
use crate::error::Cancelled;
|
||||
use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
|
||||
use crate::metrics::RequestKind;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
|
||||
ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
|
||||
@@ -165,7 +164,7 @@ impl AzureBlobStorage {
|
||||
let mut last_modified = None;
|
||||
let mut metadata = HashMap::new();
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let download = async {
|
||||
let response = builder
|
||||
@@ -237,19 +236,8 @@ impl AzureBlobStorage {
|
||||
TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
|
||||
},
|
||||
};
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
let outcome = match &download {
|
||||
Ok(_) => AttemptOutcome::Ok,
|
||||
// At this level in the stack 404 and 304 responses do not indicate an error.
|
||||
// There's expected cases when a blob may not exist or hasn't been modified since
|
||||
// the last get (e.g. probing for timeline indices and heatmap downloads).
|
||||
// Callers should handle errors if they are unexpected.
|
||||
Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
|
||||
Err(_) => AttemptOutcome::Err,
|
||||
};
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, outcome, started_at);
|
||||
|
||||
|
||||
download
|
||||
}
|
||||
|
||||
@@ -431,7 +419,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let kind = RequestKind::Head;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
|
||||
let properties_future = blob_client.get_properties().into_future();
|
||||
@@ -443,12 +431,9 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
if let Ok(inner) = &res {
|
||||
// do not incl. timeouts as errors in metrics but cancellations
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, inner, started_at);
|
||||
if let Ok(_inner) = &res {
|
||||
|
||||
|
||||
}
|
||||
|
||||
let data = match res {
|
||||
@@ -476,7 +461,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let kind = RequestKind::Put;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let op = async {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
|
||||
@@ -509,14 +494,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
let outcome = match res {
|
||||
Ok(_) => AttemptOutcome::Ok,
|
||||
Err(_) => AttemptOutcome::Err,
|
||||
};
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, outcome, started_at);
|
||||
|
||||
|
||||
res
|
||||
}
|
||||
@@ -562,7 +540,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Delete;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let op = async {
|
||||
// TODO batch requests are not supported by the SDK
|
||||
@@ -628,10 +606,8 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &res, started_at);
|
||||
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -647,7 +623,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Copy;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let timeout = tokio::time::sleep(self.timeout);
|
||||
|
||||
@@ -701,10 +677,8 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
},
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &res, started_at);
|
||||
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use metrics::{
|
||||
Histogram, IntCounter, register_histogram_vec, register_int_counter, register_int_counter_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
|
||||
|
||||
|
||||
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub(crate) enum RequestKind {
|
||||
@@ -16,62 +14,9 @@ pub(crate) enum RequestKind {
|
||||
Head = 6,
|
||||
}
|
||||
|
||||
use RequestKind::*;
|
||||
|
||||
use scopeguard::ScopeGuard;
|
||||
|
||||
impl RequestKind {
|
||||
const fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Get => "get_object",
|
||||
Put => "put_object",
|
||||
Delete => "delete_object",
|
||||
List => "list_objects",
|
||||
Copy => "copy_object",
|
||||
TimeTravel => "time_travel_recover",
|
||||
Head => "head_object",
|
||||
}
|
||||
}
|
||||
const fn as_index(&self) -> usize {
|
||||
*self as usize
|
||||
}
|
||||
}
|
||||
|
||||
const REQUEST_KIND_COUNT: usize = 7;
|
||||
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
|
||||
|
||||
impl<C> RequestTyped<C> {
|
||||
pub(crate) fn get(&self, kind: RequestKind) -> &C {
|
||||
&self.0[kind.as_index()]
|
||||
}
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
|
||||
use RequestKind::*;
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
|
||||
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
|
||||
let next = it.next().unwrap();
|
||||
assert_eq!(index, next.as_index());
|
||||
f(next)
|
||||
});
|
||||
|
||||
if let Some(next) = it.next() {
|
||||
panic!("unexpected {next:?}");
|
||||
}
|
||||
|
||||
RequestTyped(arr)
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestTyped<Histogram> {
|
||||
pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
|
||||
self.get(kind).observe(started_at.elapsed().as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PassFailCancelledRequestTyped<C> {
|
||||
success: RequestTyped<C>,
|
||||
fail: RequestTyped<C>,
|
||||
cancelled: RequestTyped<C>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) enum AttemptOutcome {
|
||||
@@ -89,138 +34,22 @@ impl<T, E> From<&Result<T, E>> for AttemptOutcome {
|
||||
}
|
||||
}
|
||||
|
||||
impl AttemptOutcome {
|
||||
pub(crate) fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
AttemptOutcome::Ok => "ok",
|
||||
AttemptOutcome::Err => "err",
|
||||
AttemptOutcome::Cancelled => "cancelled",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> PassFailCancelledRequestTyped<C> {
|
||||
pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
|
||||
let target = match outcome {
|
||||
AttemptOutcome::Ok => &self.success,
|
||||
AttemptOutcome::Err => &self.fail,
|
||||
AttemptOutcome::Cancelled => &self.cancelled,
|
||||
};
|
||||
target.get(kind)
|
||||
}
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
|
||||
let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
|
||||
let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
|
||||
let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
|
||||
|
||||
PassFailCancelledRequestTyped {
|
||||
success,
|
||||
fail,
|
||||
cancelled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PassFailCancelledRequestTyped<Histogram> {
|
||||
pub(crate) fn observe_elapsed(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
outcome: impl Into<AttemptOutcome>,
|
||||
started_at: std::time::Instant,
|
||||
) {
|
||||
self.get(kind, outcome.into())
|
||||
.observe(started_at.elapsed().as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
/// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
|
||||
pub(crate) fn start_counting_cancelled_wait(
|
||||
kind: RequestKind,
|
||||
) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
|
||||
scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.cancelled_waits
|
||||
.get(kind)
|
||||
.inc()
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
|
||||
pub(crate) fn start_measuring_requests(
|
||||
kind: RequestKind,
|
||||
_kind: RequestKind,
|
||||
) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
|
||||
scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Cancelled,
|
||||
started_at,
|
||||
)
|
||||
scopeguard::guard_on_success(std::time::Instant::now(), move |_started_at| {
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) struct BucketMetrics {
|
||||
/// Full request duration until successful completion, error or cancellation.
|
||||
pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
|
||||
/// Total amount of seconds waited on queue.
|
||||
pub(crate) wait_seconds: RequestTyped<Histogram>,
|
||||
|
||||
/// Track how many semaphore awaits were cancelled per request type.
|
||||
///
|
||||
/// This is in case cancellations are happening more than expected.
|
||||
pub(crate) cancelled_waits: RequestTyped<IntCounter>,
|
||||
|
||||
/// Total amount of deleted objects in batches or single requests.
|
||||
pub(crate) deleted_objects_total: IntCounter,
|
||||
}
|
||||
|
||||
impl Default for BucketMetrics {
|
||||
fn default() -> Self {
|
||||
// first bucket 100 microseconds to count requests that do not need to wait at all
|
||||
// and get a permit immediately
|
||||
let buckets = [0.0001, 0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
|
||||
|
||||
let req_seconds = register_histogram_vec!(
|
||||
"remote_storage_s3_request_seconds",
|
||||
"Seconds to complete a request",
|
||||
&["request_type", "result"],
|
||||
buckets.to_vec(),
|
||||
)
|
||||
.unwrap();
|
||||
let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
|
||||
req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
|
||||
});
|
||||
|
||||
let wait_seconds = register_histogram_vec!(
|
||||
"remote_storage_s3_wait_seconds",
|
||||
"Seconds rate limited",
|
||||
&["request_type"],
|
||||
buckets.to_vec(),
|
||||
)
|
||||
.unwrap();
|
||||
let wait_seconds =
|
||||
RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
|
||||
|
||||
let cancelled_waits = register_int_counter_vec!(
|
||||
"remote_storage_s3_cancelled_waits_total",
|
||||
"Times a semaphore wait has been cancelled per request type",
|
||||
&["request_type"],
|
||||
)
|
||||
.unwrap();
|
||||
let cancelled_waits =
|
||||
RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
|
||||
|
||||
let deleted_objects_total = register_int_counter!(
|
||||
"remote_storage_s3_deleted_objects_total",
|
||||
"Amount of deleted objects in total",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
req_seconds,
|
||||
wait_seconds,
|
||||
cancelled_waits,
|
||||
deleted_objects_total,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ use super::StorageMetadata;
|
||||
use crate::config::S3Config;
|
||||
use crate::error::Cancelled;
|
||||
pub(super) use crate::metrics::RequestKind;
|
||||
use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
|
||||
use crate::metrics::{AttemptOutcome, start_measuring_requests};
|
||||
use crate::support::PermitCarrying;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
@@ -199,7 +199,7 @@ impl S3Bucket {
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
|
||||
let started_at = start_counting_cancelled_wait(kind);
|
||||
|
||||
let acquire = self.concurrency_limiter.acquire(kind);
|
||||
|
||||
let permit = tokio::select! {
|
||||
@@ -207,10 +207,8 @@ impl S3Bucket {
|
||||
_ = cancel.cancelled() => return Err(Cancelled),
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.wait_seconds
|
||||
.observe_elapsed(kind, started_at);
|
||||
|
||||
|
||||
|
||||
Ok(permit)
|
||||
}
|
||||
@@ -220,7 +218,7 @@ impl S3Bucket {
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
|
||||
let started_at = start_counting_cancelled_wait(kind);
|
||||
|
||||
let acquire = self.concurrency_limiter.acquire_owned(kind);
|
||||
|
||||
let permit = tokio::select! {
|
||||
@@ -228,10 +226,8 @@ impl S3Bucket {
|
||||
_ = cancel.cancelled() => return Err(Cancelled),
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.wait_seconds
|
||||
.observe_elapsed(kind, started_at);
|
||||
|
||||
|
||||
Ok(permit)
|
||||
}
|
||||
|
||||
@@ -273,11 +269,7 @@ impl S3Bucket {
|
||||
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
|
||||
// an error: we expect to sometimes fetch an object and find it missing,
|
||||
// e.g. when probing for timeline indices.
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
|
||||
return Err(DownloadError::NotFound);
|
||||
}
|
||||
Err(SdkError::ServiceError(e))
|
||||
@@ -287,19 +279,11 @@ impl S3Bucket {
|
||||
if e.raw().status().as_u16() == StatusCode::NotModified =>
|
||||
{
|
||||
// Count an unmodified file as a success.
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
|
||||
|
||||
return Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("download s3 object"),
|
||||
@@ -346,11 +330,11 @@ impl S3Bucket {
|
||||
delete_objects: &[ObjectIdentifier],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Delete;
|
||||
|
||||
let mut cancel = std::pin::pin!(cancel.cancelled());
|
||||
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let req = self
|
||||
.client
|
||||
@@ -370,15 +354,10 @@ impl S3Bucket {
|
||||
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &resp, started_at);
|
||||
|
||||
|
||||
|
||||
let resp = resp.context("request deletion")?;
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
|
||||
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
@@ -445,8 +424,8 @@ pin_project_lite::pin_project! {
|
||||
}
|
||||
|
||||
impl<S> PinnedDrop for TimedDownload<S> {
|
||||
fn drop(mut this: Pin<&mut Self>) {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
|
||||
fn drop(mut _this: Pin<&mut Self>) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -511,7 +490,7 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
let mut continuation_token = None;
|
||||
'outer: loop {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
// min of two Options, returning Some if one is value and another is
|
||||
// None (None is smaller than anything, so plain min doesn't work).
|
||||
@@ -544,11 +523,9 @@ impl RemoteStorage for S3Bucket {
|
||||
.context("Failed to list S3 prefixes")
|
||||
.map_err(DownloadError::Other);
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
|
||||
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &response, started_at);
|
||||
|
||||
|
||||
let response = match response {
|
||||
Ok(response) => response,
|
||||
@@ -629,7 +606,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let kind = RequestKind::Head;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let head_future = self
|
||||
.client
|
||||
@@ -648,30 +625,18 @@ impl RemoteStorage for S3Bucket {
|
||||
let res = res.map_err(|_e| DownloadError::Timeout)?;
|
||||
|
||||
// do not incl. timeouts as errors in metrics but cancellations
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &res, started_at);
|
||||
|
||||
|
||||
|
||||
let data = match res {
|
||||
Ok(object_output) => object_output,
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
|
||||
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
|
||||
// an error: we expect to sometimes fetch an object and find it missing,
|
||||
// e.g. when probing for timeline indices.
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
return Err(DownloadError::NotFound);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
|
||||
|
||||
return Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("s3 head object"),
|
||||
@@ -704,7 +669,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let kind = RequestKind::Put;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
|
||||
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
|
||||
@@ -727,12 +692,10 @@ impl RemoteStorage for S3Bucket {
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
if let Ok(inner) = &res {
|
||||
if let Ok(_inner) = &res {
|
||||
// do not incl. timeouts as errors in metrics but cancellations
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, inner, started_at);
|
||||
|
||||
|
||||
}
|
||||
|
||||
match res {
|
||||
@@ -753,7 +716,7 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
let timeout = tokio::time::sleep(self.timeout);
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
|
||||
// we need to specify bucket_name as a prefix
|
||||
let copy_source = format!(
|
||||
@@ -777,10 +740,8 @@ impl RemoteStorage for S3Bucket {
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &res, started_at);
|
||||
|
||||
|
||||
|
||||
res?;
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::fmt::Display;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use metrics::IntCounter;
|
||||
|
||||
/// Circuit breakers are for operations that are expensive and fallible.
|
||||
///
|
||||
@@ -54,7 +53,7 @@ impl CircuitBreaker {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail<E>(&mut self, metric: &IntCounter, error: E)
|
||||
pub fn fail<E>(&mut self, error: E)
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
@@ -64,18 +63,18 @@ impl CircuitBreaker {
|
||||
|
||||
self.fail_count += 1;
|
||||
if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
|
||||
self.break_circuit(metric, error);
|
||||
self.break_circuit( error);
|
||||
}
|
||||
}
|
||||
|
||||
/// Call this after successfully executing an operation
|
||||
pub fn success(&mut self, metric: &IntCounter) {
|
||||
pub fn success(&mut self) {
|
||||
self.fail_count = 0;
|
||||
if let Some(broken_at) = &self.broken_at {
|
||||
tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})",
|
||||
humantime::format_duration(broken_at.elapsed()));
|
||||
self.broken_at = None;
|
||||
metric.inc();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,13 +97,13 @@ impl CircuitBreaker {
|
||||
}
|
||||
}
|
||||
|
||||
fn break_circuit<E>(&mut self, metric: &IntCounter, error: E)
|
||||
fn break_circuit<E>(&mut self, error: E)
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
self.broken_at = Some(Instant::now());
|
||||
tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}");
|
||||
metric.inc();
|
||||
|
||||
}
|
||||
|
||||
fn reset_circuit(&mut self) {
|
||||
|
||||
@@ -20,7 +20,6 @@ use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::{
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
|
||||
};
|
||||
@@ -321,10 +320,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn startup_checkpoint(started_at: Instant, _phase: &str, human_phase: &str) {
|
||||
let elapsed = started_at.elapsed();
|
||||
let secs = elapsed.as_secs_f64();
|
||||
STARTUP_DURATION.with_label_values(&[phase]).set(secs);
|
||||
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
@@ -355,10 +353,7 @@ fn start_pageserver(
|
||||
set_launch_timestamp_metric(launch_ts);
|
||||
#[cfg(target_os = "linux")]
|
||||
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
|
||||
metrics::register_internal(Box::new(
|
||||
pageserver::metrics::tokio_epoll_uring::Collector::new(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
pageserver::preinitialize_metrics(conf, ignored);
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
@@ -502,7 +497,6 @@ fn start_pageserver(
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
startup_checkpoint(started_startup_at, "initial", "Starting loading tenants");
|
||||
STARTUP_IS_LOADING.set(1);
|
||||
|
||||
// Startup staging or optimizing:
|
||||
//
|
||||
@@ -578,7 +572,6 @@ fn start_pageserver(
|
||||
"initial_tenant_load",
|
||||
"Initial load completed",
|
||||
);
|
||||
STARTUP_IS_LOADING.set(0);
|
||||
});
|
||||
|
||||
let WaitForPhaseResult {
|
||||
|
||||
@@ -261,7 +261,7 @@ where
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
while let Some((tenant_id, tenant)) = tenants.next().await {
|
||||
let mut tenant_resident_size = 0;
|
||||
let tenant_resident_size = 0;
|
||||
|
||||
for timeline in tenant.list_timelines() {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
@@ -286,7 +286,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
tenant_resident_size += timeline.resident_physical_size();
|
||||
}
|
||||
|
||||
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
|
||||
|
||||
@@ -91,12 +91,12 @@
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use tracing::warn;
|
||||
use utils::{id::TimelineId, shard::TenantShardId};
|
||||
|
||||
use crate::{
|
||||
metrics::{StorageIoSizeMetrics, TimelineMetrics},
|
||||
metrics::TimelineMetrics,
|
||||
task_mgr::TaskKind,
|
||||
tenant::Timeline,
|
||||
};
|
||||
@@ -122,38 +122,35 @@ pub struct RequestContext {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum Scope {
|
||||
Global {
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
},
|
||||
SecondaryTenant {
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
},
|
||||
SecondaryTimeline {
|
||||
io_size_metrics: crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
},
|
||||
Timeline {
|
||||
// We wrap the `Arc<TimelineMetrics>`s inside another Arc to avoid child
|
||||
// We wrap the `Arc<TimelineMetrics>`s inside another Arc to avoid child
|
||||
// context creation contending for the ref counters of the Arc<TimelineMetrics>,
|
||||
// which are shared among all tasks that operate on the timeline, especially
|
||||
// concurrent page_service connections.
|
||||
#[allow(clippy::redundant_allocation)]
|
||||
arc_arc: Arc<Arc<TimelineMetrics>>,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
arc_arc: Arc<Arc<TimelineMetrics>>, },
|
||||
#[cfg(test)]
|
||||
UnitTest {
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
},
|
||||
DebugTools {
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
},
|
||||
}
|
||||
|
||||
static GLOBAL_IO_SIZE_METRICS: Lazy<crate::metrics::StorageIoSizeMetrics> =
|
||||
Lazy::new(|| crate::metrics::StorageIoSizeMetrics::new("*", "*", "*"));
|
||||
|
||||
impl Scope {
|
||||
pub(crate) fn new_global() -> Self {
|
||||
Scope::Global {
|
||||
io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
|
||||
}
|
||||
}
|
||||
/// NB: this allocates, so, use only at relatively long-lived roots, e.g., at start
|
||||
@@ -173,18 +170,13 @@ impl Scope {
|
||||
}
|
||||
}
|
||||
pub(crate) fn new_secondary_timeline(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
_tenant_shard_id: &TenantShardId,
|
||||
_timeline_id: &TimelineId,
|
||||
) -> Self {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/11156): secondary timelines have no infrastructure for metrics lifecycle.
|
||||
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = tenant_shard_id.shard_slug().to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
|
||||
let io_size_metrics =
|
||||
crate::metrics::StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id);
|
||||
Scope::SecondaryTimeline { io_size_metrics }
|
||||
Scope::SecondaryTimeline { }
|
||||
}
|
||||
pub(crate) fn new_secondary_tenant(_tenant_shard_id: &TenantShardId) -> Self {
|
||||
// Before propagating metrics via RequestContext, the labels were inferred from file path.
|
||||
@@ -197,19 +189,19 @@ impl Scope {
|
||||
// like we do for attached timelines. (We don't have attached-tenant-scoped usage of VirtualFile
|
||||
// at this point, so, we were able to completely side-step tenant-scoped stuff there).
|
||||
Scope::SecondaryTenant {
|
||||
io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
|
||||
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_unit_test() -> Self {
|
||||
Scope::UnitTest {
|
||||
io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_debug_tools() -> Self {
|
||||
Scope::DebugTools {
|
||||
io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -523,58 +515,18 @@ impl RequestContext {
|
||||
self.access_stats_behavior
|
||||
}
|
||||
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub(crate) fn read_path_debug(&self) -> bool {
|
||||
self.read_path_debug
|
||||
}
|
||||
|
||||
pub(crate) fn io_size_metrics(&self) -> &StorageIoSizeMetrics {
|
||||
match &self.scope {
|
||||
Scope::Global { io_size_metrics } => {
|
||||
let is_unit_test = cfg!(test);
|
||||
let is_regress_test_build = cfg!(feature = "testing");
|
||||
if is_unit_test || is_regress_test_build {
|
||||
panic!("all VirtualFile instances are timeline-scoped");
|
||||
} else {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
backtrace=%std::backtrace::Backtrace::force_capture(),
|
||||
"all VirtualFile instances are timeline-scoped",
|
||||
);
|
||||
});
|
||||
|
||||
io_size_metrics
|
||||
}
|
||||
}
|
||||
Scope::Timeline { arc_arc } => &arc_arc.storage_io_size,
|
||||
Scope::SecondaryTimeline { io_size_metrics } => io_size_metrics,
|
||||
Scope::SecondaryTenant { io_size_metrics } => io_size_metrics,
|
||||
#[cfg(test)]
|
||||
Scope::UnitTest { io_size_metrics } => io_size_metrics,
|
||||
Scope::DebugTools { io_size_metrics } => io_size_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
|
||||
if duration == Duration::ZERO {
|
||||
return;
|
||||
}
|
||||
|
||||
match &self.scope {
|
||||
Scope::Timeline { arc_arc } => arc_arc
|
||||
.wait_ondemand_download_time
|
||||
.observe(self.task_kind, duration),
|
||||
Scope::Timeline { arc_arc: _ } => {},
|
||||
_ => {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
|
||||
@@ -27,7 +27,6 @@ use self::list_writer::{DeletionOp, ListWriter, RecoverOp};
|
||||
use self::validator::Validator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::controller_upcall_client::StorageControllerUpcallApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_timeline_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
@@ -163,11 +162,6 @@ struct TenantDeletionList {
|
||||
generation: Generation,
|
||||
}
|
||||
|
||||
impl TenantDeletionList {
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.timelines.values().map(|v| v.len()).sum()
|
||||
}
|
||||
}
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
@@ -467,9 +461,6 @@ impl DeletionQueueClient {
|
||||
// they may be historical.
|
||||
assert!(!current_generation.is_none());
|
||||
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
self.do_push(
|
||||
&self.tx,
|
||||
ListWriterQueueMessage::Delete(DeletionOp {
|
||||
@@ -553,9 +544,6 @@ impl DeletionQueueClient {
|
||||
&self,
|
||||
objects: Vec<RemotePath>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(objects.len() as u64);
|
||||
self.executor_tx
|
||||
.send(DeleterMessage::Delete(objects))
|
||||
.await
|
||||
|
||||
@@ -14,7 +14,6 @@ use tracing::{info, warn};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::{DeletionQueueError, FlushOp};
|
||||
use crate::metrics;
|
||||
|
||||
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
@@ -60,10 +59,6 @@ impl Deleter {
|
||||
fail::fail_point!("deletion-queue-before-execute", |_| {
|
||||
info!("Skipping execution, failpoint set");
|
||||
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["failpoint"])
|
||||
.inc();
|
||||
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
|
||||
});
|
||||
|
||||
@@ -90,9 +85,6 @@ impl Deleter {
|
||||
Ok(()) => {
|
||||
// Note: we assume that the remote storage layer returns Ok(()) if some
|
||||
// or all of the deleted objects were already gone.
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_executed
|
||||
.inc_by(self.accumulator.len() as u64);
|
||||
info!(
|
||||
"Executed deletion batch {}..{}",
|
||||
self.accumulator
|
||||
@@ -109,10 +101,6 @@ impl Deleter {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
warn!("DeleteObjects request failed: {e:#}, will continue trying");
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["execute"])
|
||||
.inc();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ use utils::id::TimelineId;
|
||||
use super::{DeletionHeader, DeletionList, FlushOp, ValidatorQueueMessage};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::deletion_queue::TEMP_SUFFIX;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_layer_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file::{MaybeFatalIo, on_fatal_io_error};
|
||||
@@ -152,7 +151,7 @@ impl ListWriter {
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
|
||||
warn!(
|
||||
sequence = self.pending.sequence,
|
||||
"Failed to write deletion list, will retry later ({e:#})"
|
||||
@@ -180,7 +179,6 @@ impl ListWriter {
|
||||
// This should never happen unless we make a mistake with our serialization.
|
||||
// Ignoring a deletion header is not consequential for correctnes because all deletions
|
||||
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
@@ -249,7 +247,6 @@ impl ListWriter {
|
||||
.as_str()
|
||||
} else {
|
||||
warn!("Unexpected key in deletion queue: {basename}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -257,7 +254,6 @@ impl ListWriter {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("Malformed key '{basename}': {e}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -286,7 +282,6 @@ impl ListWriter {
|
||||
// Drop the list on the floor: any objects it referenced will be left behind
|
||||
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
|
||||
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -329,9 +324,6 @@ impl ListWriter {
|
||||
|
||||
// We will drop out of recovery if this fails: it indicates that we are shutting down
|
||||
// or the backend has panicked
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(deletion_list.len() as u64);
|
||||
self.tx
|
||||
.send(ValidatorQueueMessage::Delete(deletion_list))
|
||||
.await?;
|
||||
@@ -353,7 +345,6 @@ impl ListWriter {
|
||||
"Failed to create deletion list directory {}, deletions will not be executed ({e})",
|
||||
self.conf.deletion_prefix(),
|
||||
);
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -422,7 +413,6 @@ impl ListWriter {
|
||||
tracing::error!(
|
||||
"Failed to enqueue deletions, leaking objects. This is a bug."
|
||||
);
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -450,7 +440,6 @@ impl ListWriter {
|
||||
tracing::error!(
|
||||
"Deletion queue recovery called more than once. This is a bug."
|
||||
);
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
// Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
|
||||
continue;
|
||||
}
|
||||
@@ -462,7 +451,6 @@ impl ListWriter {
|
||||
info!(
|
||||
"Deletion queue recover aborted, deletion queue will not proceed ({e})"
|
||||
);
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
return;
|
||||
} else {
|
||||
self.recovered = true;
|
||||
|
||||
@@ -26,7 +26,6 @@ use super::deleter::DeleterMessage;
|
||||
use super::{DeletionHeader, DeletionList, DeletionQueueError, FlushOp, VisibleLsnUpdates};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::controller_upcall_client::{RetryForeverError, StorageControllerUpcallApi};
|
||||
use crate::metrics;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
|
||||
// After this length of time, do any validation work that is pending,
|
||||
@@ -186,7 +185,6 @@ where
|
||||
"Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}",
|
||||
tenant_lsn_state.generation
|
||||
);
|
||||
metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,11 +219,8 @@ where
|
||||
|
||||
if !this_list_valid {
|
||||
info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
|
||||
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
|
||||
mutated = true;
|
||||
} else {
|
||||
metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
|
||||
}
|
||||
}
|
||||
this_list_valid
|
||||
});
|
||||
list.validated = true;
|
||||
@@ -237,7 +232,7 @@ where
|
||||
// Highly unexpected. Could happen if e.g. disk full.
|
||||
// If we didn't save the trimmed list, it is _not_ valid to execute.
|
||||
warn!("Failed to save modified deletion list {list}: {e:#}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
|
||||
|
||||
// Rather than have a complex retry process, just drop it and leak the objects,
|
||||
// scrubber will clean up eventually.
|
||||
@@ -276,7 +271,7 @@ where
|
||||
// The save() function logs a warning on error.
|
||||
if let Err(e) = header.save(self.conf).await {
|
||||
warn!("Failed to write deletion queue header: {e:#}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,6 @@ use utils::completion;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::disk_usage_based_eviction::METRICS;
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::mgr::TenantManager;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
@@ -388,7 +387,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
}
|
||||
};
|
||||
|
||||
METRICS.layers_collected.inc_by(candidates.len() as u64);
|
||||
|
||||
|
||||
tracing::info!(
|
||||
elapsed_ms = collection_time.as_millis(),
|
||||
@@ -428,7 +427,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
let (evicted_amount, usage_planned) =
|
||||
select_victims(&candidates, usage_pre).into_amount_and_planned();
|
||||
|
||||
METRICS.layers_selected.inc_by(evicted_amount as u64);
|
||||
|
||||
|
||||
// phase2: evict layers
|
||||
|
||||
@@ -457,7 +456,6 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
if let Some(next) = next {
|
||||
match next {
|
||||
Ok(Ok(file_size)) => {
|
||||
METRICS.layers_evicted.inc();
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Ok(Err((
|
||||
@@ -788,7 +786,6 @@ async fn collect_eviction_candidates(
|
||||
eviction_order: EvictionOrder,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant_manager
|
||||
@@ -822,7 +819,7 @@ async fn collect_eviction_candidates(
|
||||
continue;
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
|
||||
// collect layers from all timelines in this tenant
|
||||
//
|
||||
@@ -917,25 +914,11 @@ async fn collect_eviction_candidates(
|
||||
(partition, candidate)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: the same tenant ID might be hit twice, if it transitions from attached to
|
||||
@@ -962,7 +945,7 @@ async fn collect_eviction_candidates(
|
||||
layer_info.resident_layers.len()
|
||||
);
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
|
||||
layer_info
|
||||
.resident_layers
|
||||
@@ -984,28 +967,13 @@ async fn collect_eviction_candidates(
|
||||
candidate,
|
||||
)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
|
||||
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
|
||||
@@ -24,7 +24,6 @@ use wal_decoder::models::InterpretedWalRecord;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::{WalIngest, WalIngestErrorKind};
|
||||
@@ -324,7 +323,6 @@ async fn import_wal(
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, ctx)
|
||||
.await?;
|
||||
WAL_INGEST.records_committed.inc();
|
||||
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -77,7 +77,6 @@ use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{PageCacheSizeMetrics, page_cache_eviction_metrics};
|
||||
use crate::virtual_file::{IoBufferMut, IoPageSlice};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
@@ -195,7 +194,7 @@ impl SlotInner {
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
immutable_page_maps: [std::sync::RwLock<HashMap<(FileId, u32), usize>>; 16],
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
@@ -205,8 +204,103 @@ pub struct PageCache {
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
}
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
impl PageCache {
|
||||
/// Helper function to determine the shard index based on the low 4 bits of the u32 in the key tuple.
|
||||
fn shard_index(_file_id: &FileId, blkno: u32) -> usize {
|
||||
(blkno & 0xF) as usize
|
||||
}
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Returns the slot index, if any.
|
||||
///
|
||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||
/// get recycled for an unrelated page immediately after this function
|
||||
/// returns. The caller is responsible for re-checking that the slot still
|
||||
/// contains the page with the same key before using it.
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let shard_idx = Self::shard_index(file_id, *blkno);
|
||||
let map = self.immutable_page_maps[shard_idx].read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove mapping for given key.
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let shard_idx = Self::shard_index(file_id, *blkno);
|
||||
let mut map = self.immutable_page_maps[shard_idx].write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert mapping for given key.
|
||||
///
|
||||
/// If a mapping already existed for the given key, returns the slot index
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let shard_idx = Self::shard_index(file_id, *blkno);
|
||||
let mut map = self.immutable_page_maps[shard_idx].write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new page cache
|
||||
///
|
||||
/// This should be called only once at page server startup.
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We could use Vec::leak here, but that potentially also leaks
|
||||
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
|
||||
// this is avoided.
|
||||
let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
// SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
|
||||
let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
immutable_page_maps: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit {
|
||||
@@ -414,32 +508,17 @@ impl PageCache {
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_immutable,
|
||||
&crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
|
||||
),
|
||||
};
|
||||
read_access.inc();
|
||||
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
@@ -484,63 +563,6 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 3: Mapping functions
|
||||
//
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Returns the slot index, if any.
|
||||
///
|
||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||
/// get recycled for an unrelated page immediately after this function
|
||||
/// returns. The caller is responsible for re-checking that the slot still
|
||||
/// contains the page with the same key before using it.
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove mapping for given key.
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert mapping for given key.
|
||||
///
|
||||
/// If a mapping already existed for the given key, returns the slot index
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_immutable.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 4: Misc internal helpers
|
||||
//
|
||||
@@ -595,11 +617,7 @@ impl PageCache {
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
page_cache_eviction_metrics::observe(
|
||||
page_cache_eviction_metrics::Outcome::ItersExceeded {
|
||||
iters: iters.try_into().unwrap(),
|
||||
},
|
||||
);
|
||||
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
@@ -609,84 +627,12 @@ impl PageCache {
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(old_key);
|
||||
inner.key = None;
|
||||
page_cache_eviction_metrics::observe(
|
||||
page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
|
||||
iters: iters.try_into().unwrap(),
|
||||
},
|
||||
);
|
||||
} else {
|
||||
page_cache_eviction_metrics::observe(
|
||||
page_cache_eviction_metrics::Outcome::FoundSlotUnused {
|
||||
iters: iters.try_into().unwrap(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new page cache
|
||||
///
|
||||
/// This should be called only once at page server startup.
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We could use Vec::leak here, but that potentially also leaks
|
||||
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
|
||||
// this is avoided.
|
||||
let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
// SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
|
||||
let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait PageSzBytesMetric {
|
||||
fn set_page_sz(&self, count: usize);
|
||||
fn add_page_sz(&self, count: usize);
|
||||
fn sub_page_sz(&self, count: usize);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn count_times_page_sz(count: usize) -> u64 {
|
||||
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
|
||||
}
|
||||
|
||||
impl PageSzBytesMetric for metrics::UIntGauge {
|
||||
fn set_page_sz(&self, count: usize) {
|
||||
self.set(count_times_page_sz(count));
|
||||
}
|
||||
fn add_page_sz(&self, count: usize) {
|
||||
self.add(count_times_page_sz(count));
|
||||
}
|
||||
fn sub_page_sz(&self, count: usize) {
|
||||
self.sub(count_times_page_sz(count));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,8 +59,7 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
self, GetPageBatchBreakReason, SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::{
|
||||
@@ -275,9 +274,6 @@ async fn page_service_conn_main(
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> ConnectionHandlerResult {
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["page_service"])
|
||||
.guard();
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
@@ -641,7 +637,6 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
effective_request_lsn: Lsn,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
@@ -649,7 +644,6 @@ struct BatchedGetPageRequest {
|
||||
#[cfg(feature = "testing")]
|
||||
struct BatchedTestRequest {
|
||||
req: models::PagestreamTestRequest,
|
||||
timer: SmgrOpTimer,
|
||||
}
|
||||
|
||||
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
|
||||
@@ -659,13 +653,13 @@ struct BatchedTestRequest {
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
},
|
||||
@@ -677,13 +671,13 @@ enum BatchedFeMessage {
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
@@ -704,27 +698,7 @@ impl BatchedFeMessage {
|
||||
self.into()
|
||||
}
|
||||
|
||||
fn observe_execution_start(&mut self, at: Instant) {
|
||||
match self {
|
||||
BatchedFeMessage::Exists { timer, .. }
|
||||
| BatchedFeMessage::Nblocks { timer, .. }
|
||||
| BatchedFeMessage::DbSize { timer, .. }
|
||||
| BatchedFeMessage::GetSlruSegment { timer, .. } => {
|
||||
timer.observe_execution_start(at);
|
||||
}
|
||||
BatchedFeMessage::GetPage { pages, .. } => {
|
||||
for page in pages {
|
||||
page.timer.observe_execution_start(at);
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
BatchedFeMessage::Test { requests, .. } => {
|
||||
for req in requests {
|
||||
req.timer.observe_execution_start(at);
|
||||
}
|
||||
}
|
||||
BatchedFeMessage::RespondError { .. } => {}
|
||||
}
|
||||
fn observe_execution_start(&mut self, _at: Instant) {
|
||||
}
|
||||
|
||||
fn should_break_batch(
|
||||
@@ -964,7 +938,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
@@ -972,7 +946,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard: shard.downgrade(),
|
||||
req,
|
||||
}
|
||||
@@ -982,7 +956,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
received_at,
|
||||
@@ -990,7 +964,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
BatchedFeMessage::Nblocks {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard: shard.downgrade(),
|
||||
req,
|
||||
}
|
||||
@@ -1000,7 +974,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
@@ -1008,7 +982,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
BatchedFeMessage::DbSize {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard: shard.downgrade(),
|
||||
req,
|
||||
}
|
||||
@@ -1018,7 +992,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
received_at,
|
||||
@@ -1026,7 +1000,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard: shard.downgrade(),
|
||||
req,
|
||||
}
|
||||
@@ -1125,7 +1099,7 @@ impl PageServerHandler {
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
@@ -1158,7 +1132,6 @@ impl PageServerHandler {
|
||||
shard: shard.downgrade(),
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
effective_request_lsn,
|
||||
ctx,
|
||||
}],
|
||||
@@ -1174,13 +1147,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer =
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
.await?;
|
||||
BatchedFeMessage::Test {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
requests: vec![BatchedTestRequest { req, timer }],
|
||||
requests: vec![BatchedTestRequest { req, }],
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1281,7 +1253,7 @@ impl PageServerHandler {
|
||||
|
||||
// Dispatch the batch to the appropriate request handler.
|
||||
let log_slow_name = batch.as_static_str();
|
||||
let (mut handler_results, span) = {
|
||||
let (handler_results, span) = {
|
||||
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
|
||||
// won't fit on the stack.
|
||||
let mut boxpinned =
|
||||
@@ -1311,31 +1283,31 @@ impl PageServerHandler {
|
||||
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
|
||||
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
|
||||
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
|
||||
let flush_timers = {
|
||||
let flushing_start_time = Instant::now();
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_, timer)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
),
|
||||
Err(_) => {
|
||||
// TODO: measure errors
|
||||
None
|
||||
}
|
||||
};
|
||||
flush_timers.push(flush_timer);
|
||||
}
|
||||
assert_eq!(flush_timers.len(), handler_results.len());
|
||||
flush_timers
|
||||
};
|
||||
// let flush_timers = {
|
||||
// let flushing_start_time = Instant::now();
|
||||
// let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
// for handler_result in &mut handler_results {
|
||||
// let flush_timer = match handler_result {
|
||||
// Ok((_, timer)) => Some(
|
||||
// timer
|
||||
// .observe_execution_end(flushing_start_time)
|
||||
// .expect("we are the first caller"),
|
||||
// ),
|
||||
// Err(_) => {
|
||||
// // TODO: measure errors
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
// flush_timers.push(flush_timer);
|
||||
// }
|
||||
// assert_eq!(flush_timers.len(), handler_results.len());
|
||||
// flush_timers
|
||||
// };
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
for handler_result in handler_results.into_iter() {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
@@ -1367,7 +1339,7 @@ impl PageServerHandler {
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed)) => response_msg,
|
||||
Ok((response_msg, )) => response_msg,
|
||||
};
|
||||
|
||||
//
|
||||
@@ -1381,17 +1353,17 @@ impl PageServerHandler {
|
||||
failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
|
||||
|
||||
// what we want to do
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// let flush_fut = match flushing_timer {
|
||||
// Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
// Instant::now(),
|
||||
// flush_fut,
|
||||
// socket_fd,
|
||||
// )),
|
||||
// None => futures::future::Either::Right(flush_fut),
|
||||
// };
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
@@ -1421,7 +1393,7 @@ impl PageServerHandler {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
Vec<Result<(PagestreamBeMessage, ), BatchedPageStreamError>>,
|
||||
Span,
|
||||
),
|
||||
QueryError,
|
||||
@@ -1437,7 +1409,7 @@ impl PageServerHandler {
|
||||
Ok(match batch {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
@@ -1448,7 +1420,7 @@ impl PageServerHandler {
|
||||
self.handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, ))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1456,7 +1428,7 @@ impl PageServerHandler {
|
||||
}
|
||||
BatchedFeMessage::Nblocks {
|
||||
span,
|
||||
timer,
|
||||
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
@@ -1467,7 +1439,7 @@ impl PageServerHandler {
|
||||
self.handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, ))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1503,7 +1475,6 @@ impl PageServerHandler {
|
||||
}
|
||||
BatchedFeMessage::DbSize {
|
||||
span,
|
||||
timer,
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
@@ -1514,7 +1485,7 @@ impl PageServerHandler {
|
||||
self.handle_db_size_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, ))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1522,7 +1493,6 @@ impl PageServerHandler {
|
||||
}
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
span,
|
||||
timer,
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
@@ -1533,7 +1503,7 @@ impl PageServerHandler {
|
||||
self.handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, ))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -2179,15 +2149,11 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
_batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
) -> Vec<Result<(PagestreamBeMessage, ), BatchedPageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
.query_metrics
|
||||
.observe_getpage_batch_start(requests.len(), batch_break_reason);
|
||||
|
||||
// If a page trace is running, submit an event for this request.
|
||||
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
|
||||
let time = SystemTime::now();
|
||||
@@ -2287,7 +2253,7 @@ impl PageServerHandler {
|
||||
req: req.req,
|
||||
page,
|
||||
}),
|
||||
req.timer,
|
||||
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
@@ -2332,7 +2298,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
) -> Vec<Result<(PagestreamBeMessage,), BatchedPageStreamError>> {
|
||||
// real requests would do something with the timeline
|
||||
let mut results = Vec::with_capacity(requests.len());
|
||||
for _req in requests.iter() {
|
||||
@@ -2358,7 +2324,6 @@ impl PageServerHandler {
|
||||
PagestreamBeMessage::Test(models::PagestreamTestResponse {
|
||||
req: req.req.clone(),
|
||||
}),
|
||||
req.timer,
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
@@ -2913,12 +2878,7 @@ where
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let command_kind = match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
|
||||
PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
|
||||
};
|
||||
COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
|
||||
|
||||
|
||||
self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
|
||||
.await?;
|
||||
}
|
||||
@@ -2935,10 +2895,7 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Basebackup)
|
||||
.inc();
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
|
||||
|
||||
let res = async {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
@@ -2956,7 +2913,7 @@ where
|
||||
Result::<(), QueryError>::Ok(())
|
||||
}
|
||||
.await;
|
||||
metric_recording.observe(&res);
|
||||
|
||||
res?;
|
||||
}
|
||||
// same as basebackup, but result includes relational data as well
|
||||
@@ -2972,9 +2929,7 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Fullbackup)
|
||||
.inc();
|
||||
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(
|
||||
@@ -3008,9 +2963,7 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::LeaseLsn)
|
||||
.inc();
|
||||
|
||||
|
||||
match self
|
||||
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
|
||||
|
||||
@@ -36,15 +36,13 @@ use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch ;
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
};
|
||||
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
@@ -1032,19 +1030,16 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
let mut result = HashMap::new();
|
||||
let mut sz = 0;
|
||||
|
||||
for (_, v) in kv {
|
||||
let v = v?;
|
||||
let v = aux_file::decode_file_value_bytes(&v)
|
||||
.context("value decode")
|
||||
.map_err(PageReconstructError::Other)?;
|
||||
for (fname, content) in v {
|
||||
sz += fname.len();
|
||||
sz += content.len();
|
||||
result.insert(fname, content);
|
||||
}
|
||||
}
|
||||
self.aux_file_size_estimator.on_initial(sz);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -1315,12 +1310,12 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1345,25 +1340,21 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
pub fn set_cached_rel_size(&self, _tag: RelTag, _lsn: Lsn, _nblocks: BlockNumber) {
|
||||
|
||||
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
pub fn remove_cached_rel_size(&self, _tag: &RelTag) {
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1438,25 +1429,7 @@ impl DatadirModification<'_> {
|
||||
.is_some_and(|b| b.has_data())
|
||||
}
|
||||
|
||||
/// Returns statistics about the currently pending modifications.
|
||||
pub(crate) fn stats(&self) -> DatadirModificationStats {
|
||||
let mut stats = DatadirModificationStats::default();
|
||||
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
|
||||
match value {
|
||||
Value::Image(_) => stats.metadata_images += 1,
|
||||
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
|
||||
Value::WalRecord(_) => stats.metadata_deltas += 1,
|
||||
}
|
||||
}
|
||||
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
|
||||
match valuemeta {
|
||||
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
|
||||
ValueMeta::Serialized(_) => stats.data_deltas += 1,
|
||||
ValueMeta::Observed(_) => {}
|
||||
}
|
||||
}
|
||||
stats
|
||||
}
|
||||
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
|
||||
@@ -2331,20 +2304,15 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
let mut new_files = other_files;
|
||||
match (modifying_file, content.is_empty()) {
|
||||
(Some(old_content), false) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_update(old_content.len(), content.len());
|
||||
(Some(_old_content), false) => {
|
||||
|
||||
new_files.push((path, content));
|
||||
}
|
||||
(Some(old_content), true) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_remove(old_content.len());
|
||||
(Some(_old_content), true) => {
|
||||
|
||||
// not adding the file key to the final `new_files` vec.
|
||||
}
|
||||
(None, false) => {
|
||||
self.tline.aux_file_size_estimator.on_add(content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
// Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
|
||||
|
||||
@@ -83,11 +83,6 @@ use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_STATE_METRIC,
|
||||
TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::LocationMode;
|
||||
use crate::tenant::gc_result::GcResult;
|
||||
@@ -1358,7 +1353,7 @@ impl Tenant {
|
||||
let starting_up = init_order.is_some();
|
||||
scopeguard::defer! {
|
||||
if starting_up {
|
||||
TENANT.startup_complete.inc();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1461,7 +1456,7 @@ impl Tenant {
|
||||
|
||||
let preload = match &mode {
|
||||
SpawnMode::Eager | SpawnMode::Lazy => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
|
||||
let res = tenant_clone
|
||||
.preload(&remote_storage, task_mgr::shutdown_token())
|
||||
.await;
|
||||
@@ -1483,7 +1478,7 @@ impl Tenant {
|
||||
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
|
||||
let attach_start = std::time::Instant::now();
|
||||
let attached = {
|
||||
let _attach_timer = Some(TENANT.attach.start_timer());
|
||||
|
||||
tenant_clone.attach(preload, &ctx).await
|
||||
};
|
||||
let attach_duration = attach_start.elapsed();
|
||||
@@ -3185,7 +3180,7 @@ impl Tenant {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.success(&CIRCUIT_BREAKERS_UNBROKEN);
|
||||
.success();
|
||||
|
||||
match has_pending {
|
||||
true => Ok(CompactionOutcome::Pending),
|
||||
@@ -3206,13 +3201,13 @@ impl Tenant {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
.fail( err);
|
||||
}
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
.fail( err);
|
||||
}
|
||||
CompactionError::AlreadyRunning(_) => {}
|
||||
}
|
||||
@@ -3392,7 +3387,7 @@ impl Tenant {
|
||||
"activation attempt finished"
|
||||
);
|
||||
|
||||
TENANT.activation.observe(elapsed.as_secs_f64());
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -3517,7 +3512,6 @@ impl Tenant {
|
||||
// Wait for any in-flight operations to complete
|
||||
self.gate.close().await;
|
||||
|
||||
remove_tenant_metrics(&self.tenant_shard_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -3850,33 +3844,13 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_sizes(&self) -> TopTenantShardItem {
|
||||
let mut result = TopTenantShardItem {
|
||||
TopTenantShardItem {
|
||||
id: self.tenant_shard_id,
|
||||
resident_size: 0,
|
||||
physical_size: 0,
|
||||
max_logical_size: 0,
|
||||
max_logical_size_per_shard: 0,
|
||||
};
|
||||
|
||||
for timeline in self.timelines.lock().unwrap().values() {
|
||||
result.resident_size += timeline.metrics.resident_physical_size_gauge.get();
|
||||
|
||||
result.physical_size += timeline
|
||||
.remote_client
|
||||
.metrics
|
||||
.remote_physical_size_gauge
|
||||
.get();
|
||||
result.max_logical_size = std::cmp::max(
|
||||
result.max_logical_size,
|
||||
timeline.metrics.current_logical_size_gauge.get(),
|
||||
);
|
||||
}
|
||||
|
||||
result.max_logical_size_per_shard = result
|
||||
.max_logical_size
|
||||
.div_ceil(self.tenant_shard_id.shard_count.count() as u64);
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4243,55 +4217,15 @@ impl Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
tokio::spawn(async move {
|
||||
// reflect tenant state in metrics:
|
||||
// - global per tenant state: TENANT_STATE_METRIC
|
||||
// - "set" of broken tenants: BROKEN_TENANTS_SET
|
||||
//
|
||||
// set of broken tenants should not have zero counts so that it remains accessible for
|
||||
// alerting.
|
||||
|
||||
let tid = tenant_shard_id.to_string();
|
||||
let shard_id = tenant_shard_id.shard_slug().to_string();
|
||||
let set_key = &[tid.as_str(), shard_id.as_str()][..];
|
||||
|
||||
fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
|
||||
([state.into()], matches!(state, TenantState::Broken { .. }))
|
||||
}
|
||||
|
||||
let mut tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
let mut counted_broken = if is_broken {
|
||||
// add the id to the set right away, there should not be any updates on the channel
|
||||
// after before tenant is removed, if ever
|
||||
BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
|
||||
loop {
|
||||
let labels = &tuple.0;
|
||||
let current = TENANT_STATE_METRIC.with_label_values(labels);
|
||||
current.inc();
|
||||
|
||||
|
||||
if rx.changed().await.is_err() {
|
||||
// tenant has been dropped
|
||||
current.dec();
|
||||
drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
current.dec();
|
||||
tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
if is_broken && !counted_broken {
|
||||
counted_broken = true;
|
||||
// insert the tenant_id (back) into the set while avoiding needless counter
|
||||
// access
|
||||
BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -4666,10 +4600,6 @@ impl Tenant {
|
||||
let now = SystemTime::now();
|
||||
target.leases.retain(|_, lease| !lease.is_expired(&now));
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
.valid_lsn_lease_count_gauge
|
||||
.set(target.leases.len() as u64);
|
||||
|
||||
// Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR
|
||||
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
|
||||
@@ -4679,22 +4609,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Update metrics that depend on GC state
|
||||
timeline
|
||||
.metrics
|
||||
.archival_size
|
||||
.set(if target.within_ancestor_pitr {
|
||||
timeline.metrics.current_logical_size_gauge.get()
|
||||
} else {
|
||||
0
|
||||
});
|
||||
timeline.metrics.pitr_history_size.set(
|
||||
timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(target.cutoffs.time)
|
||||
.unwrap_or(Lsn(0))
|
||||
.0,
|
||||
);
|
||||
|
||||
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
@@ -5444,10 +5358,6 @@ impl Tenant {
|
||||
// Only shard zero should be calculating synthetic sizes
|
||||
debug_assert!(self.shard_identity.is_shard_zero());
|
||||
|
||||
TENANT_SYNTHETIC_SIZE_METRIC
|
||||
.get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
|
||||
.unwrap()
|
||||
.set(size);
|
||||
}
|
||||
|
||||
pub fn cached_synthetic_size(&self) -> u64 {
|
||||
@@ -5524,16 +5434,7 @@ impl Tenant {
|
||||
/// than they report here, due to layer eviction. Tenants with many active branches may
|
||||
/// actually use more than they report here.
|
||||
pub(crate) fn local_storage_wanted(&self) -> u64 {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
|
||||
// Heuristic: we use the max() of the timelines' visible sizes, rather than the sum. This
|
||||
// reflects the observation that on tenants with multiple large branches, typically only one
|
||||
// of them is used actively enough to occupy space on disk.
|
||||
timelines
|
||||
.values()
|
||||
.map(|t| t.metrics.visible_physical_size_gauge.get())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
1000
|
||||
}
|
||||
|
||||
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
|
||||
@@ -5617,16 +5518,11 @@ async fn run_initdb(
|
||||
);
|
||||
|
||||
let _permit = {
|
||||
let _timer = INITDB_SEMAPHORE_ACQUISITION_TIME.start_timer();
|
||||
|
||||
INIT_DB_SEMAPHORE.acquire().await
|
||||
};
|
||||
|
||||
CONCURRENT_INITDBS.inc();
|
||||
scopeguard::defer! {
|
||||
CONCURRENT_INITDBS.dec();
|
||||
}
|
||||
|
||||
let _timer = INITDB_RUN_TIME.start_timer();
|
||||
let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser: &conf.superuser,
|
||||
locale: &conf.locale,
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::controller_upcall_client::{
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
|
||||
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::metrics::TENANT_MANAGER as METRICS;
|
||||
use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
|
||||
use crate::tenant::config::{
|
||||
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
|
||||
@@ -519,7 +519,7 @@ pub async fn init_tenant_mgr(
|
||||
tenant_configs.len(),
|
||||
conf.concurrent_tenant_warmup.initial_permits()
|
||||
);
|
||||
TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
|
||||
|
||||
|
||||
// Accumulate futures for writing tenant configs, so that we can execute in parallel
|
||||
let mut config_write_futs = Vec::new();
|
||||
@@ -2177,9 +2177,7 @@ impl TenantManager {
|
||||
// we would use if not doing any eviction.
|
||||
progress.bytes_total
|
||||
} else {
|
||||
// In the absence of heatmap info, assume that the secondary location simply
|
||||
// needs as much space as it is currently using.
|
||||
secondary.resident_size_metric.get()
|
||||
42
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2530,7 +2528,7 @@ impl SlotGuard {
|
||||
Ok(())
|
||||
}
|
||||
None => {
|
||||
METRICS.unexpected_errors.inc();
|
||||
|
||||
error!(
|
||||
tenant_shard_id = %self.tenant_shard_id,
|
||||
"Missing InProgress marker during tenant upsert, this is a bug."
|
||||
@@ -2540,7 +2538,7 @@ impl SlotGuard {
|
||||
))
|
||||
}
|
||||
Some(slot) => {
|
||||
METRICS.unexpected_errors.inc();
|
||||
|
||||
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
|
||||
Err(TenantSlotUpsertError::InternalError(
|
||||
"Unexpected contents of TenantSlot".into(),
|
||||
@@ -2621,7 +2619,7 @@ impl Drop for SlotGuard {
|
||||
match m.entry(self.tenant_shard_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
|
||||
METRICS.unexpected_errors.inc();
|
||||
|
||||
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
|
||||
}
|
||||
|
||||
@@ -2636,7 +2634,7 @@ impl Drop for SlotGuard {
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {
|
||||
METRICS.unexpected_errors.inc();
|
||||
|
||||
error!(
|
||||
tenant_shard_id = %self.tenant_shard_id,
|
||||
"Missing InProgress marker during SlotGuard drop, this is a bug."
|
||||
@@ -2696,7 +2694,7 @@ fn tenant_map_acquire_slot_impl(
|
||||
mode: TenantSlotAcquireMode,
|
||||
) -> Result<SlotGuard, TenantSlotError> {
|
||||
use TenantSlotAcquireMode::*;
|
||||
METRICS.tenant_slot_writes.inc();
|
||||
|
||||
|
||||
let mut locked = tenants.write().unwrap();
|
||||
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
|
||||
|
||||
@@ -223,9 +223,8 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
MeasureRemoteOp,
|
||||
RemoteOpFileKind, RemoteOpKind,
|
||||
};
|
||||
use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind, shutdown_token};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -357,8 +356,6 @@ pub(crate) struct RemoteTimelineClient {
|
||||
|
||||
upload_queue: Mutex<UploadQueue>,
|
||||
|
||||
pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
|
||||
storage_impl: GenericRemoteStorage,
|
||||
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
@@ -405,10 +402,6 @@ impl RemoteTimelineClient {
|
||||
storage_impl: remote_storage,
|
||||
deletion_queue_client,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
@@ -597,21 +590,13 @@ impl RemoteTimelineClient {
|
||||
.map_err(|_| UploadQueueNotReadyError)
|
||||
}
|
||||
|
||||
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
|
||||
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
|
||||
current_remote_index_part
|
||||
.layer_metadata
|
||||
.values()
|
||||
.map(|ilmd| ilmd.file_size)
|
||||
.sum()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
self.metrics.remote_physical_size_gauge.set(size);
|
||||
fn update_remote_physical_size_gauge(&self, _current_remote_index_part: Option<&IndexPart>) {
|
||||
|
||||
|
||||
}
|
||||
|
||||
pub fn get_remote_physical_size(&self) -> u64 {
|
||||
self.metrics.remote_physical_size_gauge.get()
|
||||
0
|
||||
}
|
||||
|
||||
//
|
||||
@@ -626,13 +611,6 @@ impl RemoteTimelineClient {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
let (index_part, index_generation, index_last_modified) = download::download_index_part(
|
||||
&self.storage_impl,
|
||||
@@ -645,7 +623,7 @@ impl RemoteTimelineClient {
|
||||
Option::<TaskKind>::None,
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -720,13 +698,7 @@ impl RemoteTimelineClient {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Layer,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -743,13 +715,11 @@ impl RemoteTimelineClient {
|
||||
Some(ctx.task_kind()),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
|
||||
REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
|
||||
|
||||
Ok(downloaded_size)
|
||||
}
|
||||
@@ -1027,7 +997,6 @@ impl RemoteTimelineClient {
|
||||
let op = UploadOp::UploadMetadata {
|
||||
uploaded: Box::new(index_part.clone()),
|
||||
};
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
|
||||
|
||||
@@ -1265,7 +1234,6 @@ impl RemoteTimelineClient {
|
||||
);
|
||||
|
||||
let op = UploadOp::UploadLayer(layer, metadata, None);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
@@ -1442,7 +1410,6 @@ impl RemoteTimelineClient {
|
||||
let op = UploadOp::Delete(Delete {
|
||||
layers: with_metadata,
|
||||
});
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
@@ -2180,7 +2147,7 @@ impl RemoteTimelineClient {
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2197,7 +2164,7 @@ impl RemoteTimelineClient {
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
)
|
||||
.await;
|
||||
if res.is_ok() {
|
||||
@@ -2343,10 +2310,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.clean.1 = Some(task.task_id);
|
||||
|
||||
let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
|
||||
self.metrics
|
||||
.projected_remote_consistent_lsn_gauge
|
||||
.set(lsn.0);
|
||||
|
||||
|
||||
if self.generation.is_none() {
|
||||
// Legacy mode: skip validating generation
|
||||
upload_queue.visible_remote_consistent_lsn.store(lsn);
|
||||
@@ -2387,64 +2351,6 @@ impl RemoteTimelineClient {
|
||||
.await;
|
||||
}
|
||||
|
||||
self.metric_end(&task.op);
|
||||
for coalesced_op in &task.coalesced_ops {
|
||||
self.metric_end(coalesced_op);
|
||||
}
|
||||
}
|
||||
|
||||
fn metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(
|
||||
RemoteOpFileKind,
|
||||
RemoteOpKind,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, m, _) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
),
|
||||
UploadOp::UploadMetadata { .. } => (
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
DontTrackSize {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(_delete) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => {
|
||||
// we do not account these
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some(res)
|
||||
}
|
||||
|
||||
fn metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in metric_end(), see right below
|
||||
}
|
||||
|
||||
fn metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
/// Close the upload queue for new operations and cancel queued operations.
|
||||
@@ -2524,7 +2430,6 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Tear down queued ops
|
||||
for op in qi.queued_operations.into_iter() {
|
||||
self.metric_end(&op);
|
||||
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
|
||||
// which is exactly what we want to happen.
|
||||
drop(op);
|
||||
@@ -2834,10 +2739,6 @@ mod tests {
|
||||
storage_impl: self.harness.remote_storage.clone(),
|
||||
deletion_queue_client: self.harness.deletion_queue.new_client(),
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&self.harness.tenant_shard_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
})
|
||||
@@ -3064,99 +2965,7 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bytes_unfinished_gauge_for_layer_file_uploads() {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline,
|
||||
..
|
||||
} = TestSetup::new("metrics").await.unwrap();
|
||||
let client = &timeline.remote_client;
|
||||
|
||||
let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let local_path = local_layer_path(
|
||||
harness.conf,
|
||||
&timeline.tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
&layer_file_name_1,
|
||||
&harness.generation,
|
||||
);
|
||||
let content_1 = dummy_contents("foo");
|
||||
std::fs::write(&local_path, &content_1).unwrap();
|
||||
|
||||
let layer_file_1 = Layer::for_resident(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
local_path,
|
||||
layer_file_name_1.clone(),
|
||||
LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
|
||||
);
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
finished: Option<usize>,
|
||||
}
|
||||
impl std::ops::Add for BytesStartedFinished {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
Self {
|
||||
started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
|
||||
finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
let get_bytes_started_stopped = || {
|
||||
let started = client
|
||||
.metrics
|
||||
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
let stopped = client
|
||||
.metrics
|
||||
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
BytesStartedFinished {
|
||||
started,
|
||||
finished: stopped,
|
||||
}
|
||||
};
|
||||
|
||||
// Test
|
||||
tracing::info!("now doing actual test");
|
||||
|
||||
let actual_a = get_bytes_started_stopped();
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(layer_file_1.clone())
|
||||
.unwrap();
|
||||
|
||||
let actual_b = get_bytes_started_stopped();
|
||||
|
||||
client.wait_completion().await.unwrap();
|
||||
|
||||
let actual_c = get_bytes_started_stopped();
|
||||
|
||||
// Validate
|
||||
|
||||
let expected_b = actual_a
|
||||
+ BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
// assert that the _finished metric is created eagerly so that subtractions work on first sample
|
||||
finished: Some(0),
|
||||
};
|
||||
assert_eq!(actual_b, expected_b);
|
||||
|
||||
let expected_c = actual_a
|
||||
+ BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
finished: Some(content_1.len()),
|
||||
};
|
||||
assert_eq!(actual_c, expected_c);
|
||||
}
|
||||
|
||||
|
||||
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
|
||||
// An empty IndexPart, just sufficient to ensure deserialization will succeed
|
||||
let example_index_part = IndexPart::example();
|
||||
|
||||
@@ -6,7 +6,6 @@ mod scheduler;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -26,7 +25,6 @@ use super::span::debug_assert_current_span_has_tenant_id;
|
||||
use super::storage_layer::LayerName;
|
||||
use crate::context::RequestContext;
|
||||
use crate::disk_usage_eviction_task::DiskUsageEvictionInfo;
|
||||
use crate::metrics::{SECONDARY_HEATMAP_TOTAL_SIZE, SECONDARY_RESIDENT_PHYSICAL_SIZE};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
|
||||
|
||||
enum DownloadCommand {
|
||||
@@ -109,12 +107,7 @@ pub(crate) struct SecondaryTenant {
|
||||
|
||||
// Public state indicating overall progress of downloads relative to the last heatmap seen
|
||||
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
|
||||
|
||||
// Sum of layer sizes on local disk
|
||||
pub(super) resident_size_metric: UIntGauge,
|
||||
|
||||
// Sum of layer sizes in the most recently downloaded heatmap
|
||||
pub(super) heatmap_total_size_metric: UIntGauge,
|
||||
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
@@ -124,16 +117,8 @@ impl SecondaryTenant {
|
||||
tenant_conf: pageserver_api::models::TenantConfig,
|
||||
config: &SecondaryLocationConfig,
|
||||
) -> Arc<Self> {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id])
|
||||
.unwrap();
|
||||
|
||||
let heatmap_total_size_metric = SECONDARY_HEATMAP_TOTAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id])
|
||||
.unwrap();
|
||||
|
||||
|
||||
|
||||
Arc::new(Self {
|
||||
tenant_shard_id,
|
||||
// todo: shall we make this a descendent of the
|
||||
@@ -150,14 +135,10 @@ impl SecondaryTenant {
|
||||
|
||||
progress: std::sync::Mutex::default(),
|
||||
|
||||
resident_size_metric,
|
||||
heatmap_total_size_metric,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.tenant_shard_id
|
||||
}
|
||||
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
@@ -169,15 +150,10 @@ impl SecondaryTenant {
|
||||
|
||||
// Metrics are subtracted from and/or removed eagerly.
|
||||
// Deletions are done in the background via [`BackgroundPurges::spawn`].
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
|
||||
self.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
|
||||
.drain_timelines(&self.tenant_shard_id);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
@@ -255,7 +231,7 @@ impl SecondaryTenant {
|
||||
// of the cache.
|
||||
let mut detail = this.detail.lock().unwrap();
|
||||
if let Some(removed) =
|
||||
detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
|
||||
detail.evict_layer(name, &timeline_id, now)
|
||||
{
|
||||
// We might race with removal of the same layer during downloads, so finding the layer we
|
||||
// were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
|
||||
@@ -269,10 +245,9 @@ impl SecondaryTenant {
|
||||
/// Exhaustive check that incrementally updated metrics match the actual state.
|
||||
#[cfg(feature = "testing")]
|
||||
fn validate_metrics(&self) {
|
||||
let detail = self.detail.lock().unwrap();
|
||||
let resident_size = detail.total_resident_size();
|
||||
|
||||
|
||||
assert_eq!(resident_size, self.resident_size_metric.get());
|
||||
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "testing"))]
|
||||
|
||||
@@ -4,11 +4,9 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
|
||||
@@ -33,7 +31,6 @@ use crate::context::RequestContext;
|
||||
use crate::disk_usage_eviction_task::{
|
||||
DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer, finite_f32,
|
||||
};
|
||||
use crate::metrics::SECONDARY_MODE;
|
||||
use crate::tenant::config::SecondaryLocationConfig;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::ephemeral_file::is_ephemeral_file;
|
||||
@@ -120,9 +117,6 @@ impl OnDiskState {
|
||||
.fatal_err("Deleting secondary layer")
|
||||
}
|
||||
|
||||
pub(crate) fn file_size(&self) -> u64 {
|
||||
self.metadata.file_size
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
@@ -175,13 +169,9 @@ impl SecondaryDetailTimeline {
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
resident_metric: &UIntGauge,
|
||||
) -> Option<OnDiskState> {
|
||||
let removed = self.on_disk_layers.remove(name);
|
||||
if let Some(removed) = &removed {
|
||||
resident_metric.sub(removed.file_size());
|
||||
}
|
||||
removed
|
||||
self.on_disk_layers.remove(name)
|
||||
|
||||
}
|
||||
|
||||
/// `local_path`
|
||||
@@ -191,7 +181,6 @@ impl SecondaryDetailTimeline {
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
touched: &HeatMapLayer,
|
||||
resident_metric: &UIntGauge,
|
||||
local_path: F,
|
||||
) where
|
||||
F: FnOnce() -> Utf8PathBuf,
|
||||
@@ -211,7 +200,6 @@ impl SecondaryDetailTimeline {
|
||||
touched.access_time,
|
||||
local_path(),
|
||||
));
|
||||
resident_metric.add(touched.metadata.file_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,28 +255,16 @@ impl SecondaryDetail {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) fn total_resident_size(&self) -> u64 {
|
||||
self.timelines
|
||||
.values()
|
||||
.map(|tl| {
|
||||
tl.on_disk_layers
|
||||
.values()
|
||||
.map(|v| v.metadata.file_size)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.sum::<u64>()
|
||||
}
|
||||
|
||||
pub(super) fn evict_layer(
|
||||
&mut self,
|
||||
name: LayerName,
|
||||
timeline_id: &TimelineId,
|
||||
now: SystemTime,
|
||||
resident_metric: &UIntGauge,
|
||||
|
||||
) -> Option<OnDiskState> {
|
||||
let timeline = self.timelines.get_mut(timeline_id)?;
|
||||
let removed = timeline.remove_layer(&name, resident_metric);
|
||||
let removed = timeline.remove_layer(&name);
|
||||
if removed.is_some() {
|
||||
timeline.evicted_at.insert(name, now);
|
||||
}
|
||||
@@ -297,52 +273,21 @@ impl SecondaryDetail {
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
_tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
|
||||
}
|
||||
self.timelines.remove(timeline_id);
|
||||
|
||||
}
|
||||
|
||||
pub(super) fn drain_timelines(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
resident_metric: &UIntGauge,
|
||||
_tenant_shard_id: &TenantShardId,
|
||||
|
||||
) {
|
||||
for (timeline_id, removed) in self.timelines.drain() {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fn clear_timeline_metrics(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
detail: SecondaryDetailTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
resident_metric.sub(
|
||||
detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id.as_str(),
|
||||
shard_id.as_str(),
|
||||
timeline_id.as_str(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
/// Additionally returns the total number of layers, used for more stable relative access time
|
||||
/// based eviction.
|
||||
@@ -797,7 +742,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
tenant_shard_id,
|
||||
last_heatmap,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
@@ -920,11 +864,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
bytes_downloaded: 0,
|
||||
};
|
||||
|
||||
// Also expose heatmap bytes_total as a metric
|
||||
self.secondary_state
|
||||
.heatmap_total_size_metric
|
||||
.set(heatmap_stats.bytes);
|
||||
|
||||
|
||||
// Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
|
||||
let mut delete_layers = Vec::new();
|
||||
let mut delete_timelines = Vec::new();
|
||||
@@ -991,7 +931,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
detail.remove_timeline(
|
||||
self.secondary_state.get_tenant_shard_id(),
|
||||
delete_timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1010,7 +949,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
|
||||
continue;
|
||||
};
|
||||
timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
|
||||
timeline_state.remove_layer(&layer_name);
|
||||
}
|
||||
|
||||
for timeline_id in delete_timelines {
|
||||
@@ -1077,7 +1016,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
.ok_or_else(|| UpdateError::Cancelled)
|
||||
.and_then(|x| x)
|
||||
.inspect(|_| SECONDARY_MODE.download_heatmap.inc())
|
||||
.inspect(|_|{} )
|
||||
}
|
||||
|
||||
/// Download heatmap layers that are not present on local disk, or update their
|
||||
@@ -1252,7 +1191,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
&t,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
|| {
|
||||
local_layer_path(
|
||||
self.conf,
|
||||
@@ -1364,7 +1302,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
progress.layers_downloaded += 1;
|
||||
}
|
||||
|
||||
SECONDARY_MODE.download_layer.inc();
|
||||
|
||||
Ok(Some(layer))
|
||||
}
|
||||
@@ -1376,7 +1313,6 @@ async fn init_timeline_state(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
last_heatmap: Option<&HeatMapTimeline>,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
ctx: &RequestContext,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
|
||||
@@ -1480,7 +1416,6 @@ async fn init_timeline_state(
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
remote_meta,
|
||||
resident_metric,
|
||||
|| file_path,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ use super::scheduler::{
|
||||
};
|
||||
use super::{CommandRequest, SecondaryTenantError, UploadCommand};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use crate::metrics::SECONDARY_MODE;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::config::AttachmentMode;
|
||||
use crate::tenant::mgr::{GetTenantError, TenantManager};
|
||||
@@ -221,14 +220,10 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
// Guard for the barrier in [`WriteInProgress`]
|
||||
let _completion = completion;
|
||||
|
||||
let started_at = Instant::now();
|
||||
|
||||
let uploaded = match upload_tenant_heatmap(remote_storage, &tenant, last_upload.clone()).await {
|
||||
Ok(UploadHeatmapOutcome::Uploaded(uploaded)) => {
|
||||
let duration = Instant::now().duration_since(started_at);
|
||||
SECONDARY_MODE
|
||||
.upload_heatmap_duration
|
||||
.observe(duration.as_secs_f64());
|
||||
SECONDARY_MODE.upload_heatmap.inc();
|
||||
|
||||
Some(uploaded)
|
||||
}
|
||||
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_upload,
|
||||
@@ -237,11 +232,8 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
"Failed to upload heatmap for tenant {}: {e:#}",
|
||||
tenant.get_tenant_shard_id(),
|
||||
);
|
||||
let duration = Instant::now().duration_since(started_at);
|
||||
SECONDARY_MODE
|
||||
.upload_heatmap_duration
|
||||
.observe(duration.as_secs_f64());
|
||||
SECONDARY_MODE.upload_heatmap_errors.inc();
|
||||
|
||||
|
||||
last_upload
|
||||
}
|
||||
Err(UploadHeatmapError::Cancelled) => {
|
||||
|
||||
@@ -882,14 +882,6 @@ impl ImageLayerWriterInner {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
// Calculate compression ratio
|
||||
let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
|
||||
.inc_by(self.uncompressed_bytes_eligible);
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
|
||||
let mut file = self.blob_writer.into_inner();
|
||||
|
||||
// Write out the index
|
||||
|
||||
@@ -32,7 +32,6 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{OnDiskValue, OnDiskValueIo};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
@@ -307,11 +306,7 @@ impl GlobalResourceUnits {
|
||||
}
|
||||
};
|
||||
|
||||
// This is a sloppy update: concurrent updates to the counter will race, and the exact
|
||||
// value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
|
||||
// That's okay: as long as the metric contains some recent value, it doesn't have to always
|
||||
// be literally the last update.
|
||||
TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
|
||||
|
||||
|
||||
self.dirty_bytes = size;
|
||||
|
||||
|
||||
@@ -231,9 +231,7 @@ impl Layer {
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(metadata.file_size);
|
||||
|
||||
|
||||
ResidentLayer { downloaded, owner }
|
||||
}
|
||||
@@ -526,12 +524,6 @@ impl Layer {
|
||||
}
|
||||
}
|
||||
|
||||
// Update the timeline's visible bytes count
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
.add(self.0.desc.file_size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -540,23 +532,10 @@ impl Layer {
|
||||
use LayerVisibilityHint::*;
|
||||
match (old_visibility, visibility) {
|
||||
(Visible, Covered) => {
|
||||
// Subtract this layer's contribution to the visible size metric
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
debug_assert!(
|
||||
tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
|
||||
);
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
.sub(self.0.desc.file_size)
|
||||
}
|
||||
|
||||
}
|
||||
(Covered, Visible) => {
|
||||
// Add this layer's contribution to the visible size metric
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
.add(self.0.desc.file_size)
|
||||
}
|
||||
|
||||
}
|
||||
(Covered, Covered) | (Visible, Visible) => {
|
||||
// no change
|
||||
@@ -609,7 +588,6 @@ impl ResidentOrWantedEvicted {
|
||||
ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
|
||||
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
|
||||
Some(strong) => {
|
||||
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
|
||||
|
||||
*self = ResidentOrWantedEvicted::Resident(strong.clone());
|
||||
|
||||
@@ -741,17 +719,8 @@ enum Status {
|
||||
|
||||
impl Drop for LayerInner {
|
||||
fn drop(&mut self) {
|
||||
// if there was a pending eviction, mark it cancelled here to balance metrics
|
||||
if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
|
||||
{
|
||||
// eviction has already been started
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
|
||||
// eviction request is intentionally not honored as no one is present to wait for it
|
||||
// and we could be delaying shutdown for nothing.
|
||||
}
|
||||
|
||||
let timeline = self.timeline.upgrade();
|
||||
|
||||
let timeline: Option<Arc<Timeline>> = self.timeline.upgrade();
|
||||
|
||||
if let Some(timeline) = timeline.as_ref() {
|
||||
// Only need to decrement metrics if the timeline still exists: otherwise
|
||||
@@ -759,13 +728,6 @@ impl Drop for LayerInner {
|
||||
timeline.metrics.dec_layer(&self.desc);
|
||||
|
||||
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
|
||||
debug_assert!(
|
||||
timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
|
||||
);
|
||||
timeline
|
||||
.metrics
|
||||
.visible_physical_size_gauge
|
||||
.sub(self.desc.file_size);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -777,7 +739,6 @@ impl Drop for LayerInner {
|
||||
|
||||
let path = std::mem::take(&mut self.path);
|
||||
let file_name = self.layer_desc().layer_name();
|
||||
let file_size = self.layer_desc().file_size;
|
||||
let meta = self.metadata();
|
||||
let status = self.status.take();
|
||||
|
||||
@@ -786,20 +747,13 @@ impl Drop for LayerInner {
|
||||
|
||||
// carry this until we are finished for [`Layer::wait_drop`] support
|
||||
let _status = status;
|
||||
|
||||
let Some(timeline) = timeline else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(_guard) = timeline.gate.enter() else {
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
let removed = match std::fs::remove_file(path) {
|
||||
match std::fs::remove_file(path) {
|
||||
Ok(()) => true,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// until we no longer do detaches by removing all local files before removing the
|
||||
@@ -810,34 +764,16 @@ impl Drop for LayerInner {
|
||||
// layers.
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to remove wanted deleted layer: {e}");
|
||||
LAYER_IMPL_METRICS.inc_delete_removes_failed();
|
||||
Err(_e) => {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
let res = timeline
|
||||
|
||||
let _a=timeline
|
||||
.remote_client
|
||||
.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -868,12 +804,6 @@ impl LayerInner {
|
||||
// This object acts as a RAII guard on these metrics: increment on construction
|
||||
timeline.metrics.inc_layer(&desc);
|
||||
|
||||
// New layers are visible by default. This metric is later updated on drop or in set_visibility
|
||||
timeline
|
||||
.metrics
|
||||
.visible_physical_size_gauge
|
||||
.add(desc.file_size);
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path: local_path,
|
||||
@@ -894,13 +824,9 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn delete_on_drop(&self) {
|
||||
let res =
|
||||
self.wanted_deleted
|
||||
let _a=self.wanted_deleted
|
||||
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
|
||||
|
||||
if res.is_ok() {
|
||||
LAYER_IMPL_METRICS.inc_started_deletes();
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancellation safe, however dropping the future and calling this method again might result
|
||||
@@ -938,12 +864,6 @@ impl LayerInner {
|
||||
// drop the DownloadedLayer outside of the holding the guard
|
||||
drop(strong);
|
||||
|
||||
// idea here is that only one evicter should ever get to witness a strong reference,
|
||||
// which means whenever get_or_maybe_download upgrades a weak, it must mark up a
|
||||
// cancelled eviction and signal us, like it currently does.
|
||||
//
|
||||
// a second concurrent evict_and_wait will not see a strong reference.
|
||||
LAYER_IMPL_METRICS.inc_started_evictions();
|
||||
}
|
||||
|
||||
let changed = rx.changed();
|
||||
@@ -983,15 +903,13 @@ impl LayerInner {
|
||||
// get_or_init_detached can:
|
||||
// - be fast (mutex lock) OR uncontested semaphore permit acquire
|
||||
// - be slow (wait for semaphore permit or closing)
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
|
||||
match locked {
|
||||
// this path could had been a RwLock::read
|
||||
@@ -1004,8 +922,7 @@ impl LayerInner {
|
||||
// note that we also have dropped the Guard; this is fine, because we just made
|
||||
// a state change and are holding a strong reference to be returned.
|
||||
self.status.as_ref().unwrap().send_replace(Status::Resident);
|
||||
LAYER_IMPL_METRICS
|
||||
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
|
||||
|
||||
|
||||
return Ok(strong);
|
||||
}
|
||||
@@ -1032,8 +949,7 @@ impl LayerInner {
|
||||
.upgrade()
|
||||
.ok_or(DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
|
||||
// check if we really need to be downloaded: this can happen if a read access won the
|
||||
// semaphore before eviction.
|
||||
@@ -1045,7 +961,6 @@ impl LayerInner {
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
let needs_download = needs_download?;
|
||||
|
||||
@@ -1056,7 +971,7 @@ impl LayerInner {
|
||||
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
|
||||
.await?;
|
||||
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
|
||||
return Ok(self.initialize_after_layer_is_on_disk(permit));
|
||||
};
|
||||
@@ -1097,13 +1012,13 @@ impl LayerInner {
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
@@ -1121,7 +1036,6 @@ impl LayerInner {
|
||||
"unexpectedly on-demand downloading for task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
|
||||
|
||||
let really_error =
|
||||
matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
|
||||
@@ -1173,20 +1087,7 @@ impl LayerInner {
|
||||
|
||||
let res = this.download_and_init(timeline, permit, &ctx).await;
|
||||
|
||||
if let Err(res) = tx.send(res) {
|
||||
match res {
|
||||
Ok(_res) => {
|
||||
tracing::debug!("layer initialized, but caller has been cancelled");
|
||||
LAYER_IMPL_METRICS.inc_init_completed_without_requester();
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!(
|
||||
"layer file download failed, and caller has been cancelled: {e:?}"
|
||||
);
|
||||
LAYER_IMPL_METRICS.inc_download_failed_without_requester();
|
||||
}
|
||||
}
|
||||
}
|
||||
let _a =tx.send(res);
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
@@ -1238,21 +1139,9 @@ impl LayerInner {
|
||||
}
|
||||
};
|
||||
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(self.desc.file_size);
|
||||
|
||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
|
||||
self.access_stats.record_residence_event();
|
||||
|
||||
Ok(self.initialize_after_layer_is_on_disk(permit))
|
||||
@@ -1407,12 +1296,8 @@ impl LayerInner {
|
||||
|
||||
tracing::debug!("eviction started");
|
||||
|
||||
let res = self.wait_for_turn_and_evict(only_version).await;
|
||||
// metrics: ignore the Ok branch, it is not done yet
|
||||
if let Err(e) = res {
|
||||
tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
|
||||
}
|
||||
let _a = self.wait_for_turn_and_evict(only_version).await;
|
||||
|
||||
};
|
||||
|
||||
Self::spawn(start_evicting.instrument(span));
|
||||
@@ -1532,21 +1417,13 @@ impl LayerInner {
|
||||
Self::spawn_blocking(move || {
|
||||
let _span = span.entered();
|
||||
|
||||
let res = self.evict_blocking(&timeline, &gate, &permit);
|
||||
let res = self.evict_blocking( &gate, &permit);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
|
||||
if waiters > 0 {
|
||||
LAYER_IMPL_METRICS.inc_evicted_with_waiters();
|
||||
}
|
||||
|
||||
|
||||
let completed_in = spawned_at.elapsed();
|
||||
LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
|
||||
|
||||
match res {
|
||||
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
|
||||
Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
|
||||
}
|
||||
|
||||
|
||||
tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
|
||||
});
|
||||
@@ -1557,7 +1434,6 @@ impl LayerInner {
|
||||
/// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
|
||||
fn evict_blocking(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
_gate: &gate::GateGuard,
|
||||
_permit: &heavier_once_cell::InitPermit,
|
||||
) -> Result<(), EvictionCancelled> {
|
||||
@@ -1570,17 +1446,7 @@ impl LayerInner {
|
||||
Ok(elapsed) => {
|
||||
let accessed_and_visible = self.access_stats.accessed()
|
||||
&& self.access_stats.visibility() == LayerVisibilityHint::Visible;
|
||||
if accessed_and_visible {
|
||||
// Only layers used for reads contribute to our "low residence" metric that is used
|
||||
// to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
|
||||
// to be rapidly evicted without contributing to this metric.
|
||||
timeline
|
||||
.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(elapsed);
|
||||
}
|
||||
|
||||
|
||||
tracing::info!(
|
||||
residence_millis = elapsed.as_millis(),
|
||||
@@ -1592,10 +1458,6 @@ impl LayerInner {
|
||||
tracing::info!("evicted layer after unknown residence period");
|
||||
}
|
||||
}
|
||||
timeline.metrics.evictions.inc();
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_sub(self.desc.file_size);
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
tracing::error!(
|
||||
@@ -1812,13 +1674,7 @@ impl DownloadedLayer {
|
||||
match res {
|
||||
Ok(layer) => Ok(layer),
|
||||
Err(err) => {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// We log this message once over the lifetime of `Self`
|
||||
// => Ok and good to log backtrace and path here.
|
||||
tracing::error!(
|
||||
"layer load failed, assuming permanent failure: {}: {err:?}",
|
||||
owner.path
|
||||
);
|
||||
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
@@ -2026,218 +1882,6 @@ impl From<ResidentLayer> for Layer {
|
||||
}
|
||||
}
|
||||
|
||||
use metrics::IntCounter;
|
||||
|
||||
pub(crate) struct LayerImplMetrics {
|
||||
started_evictions: IntCounter,
|
||||
completed_evictions: IntCounter,
|
||||
cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
|
||||
|
||||
started_deletes: IntCounter,
|
||||
completed_deletes: IntCounter,
|
||||
failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
|
||||
|
||||
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
|
||||
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
|
||||
redownload_after: metrics::Histogram,
|
||||
time_to_evict: metrics::Histogram,
|
||||
}
|
||||
|
||||
impl Default for LayerImplMetrics {
|
||||
fn default() -> Self {
|
||||
use enum_map::Enum;
|
||||
|
||||
// reminder: these will be pageserver_layer_* with "_total" suffix
|
||||
|
||||
let started_evictions = metrics::register_int_counter!(
|
||||
"pageserver_layer_started_evictions",
|
||||
"Evictions started in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
let completed_evictions = metrics::register_int_counter!(
|
||||
"pageserver_layer_completed_evictions",
|
||||
"Evictions completed in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cancelled_evictions = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_cancelled_evictions_count",
|
||||
"Different reasons for evictions to have been cancelled or failed",
|
||||
&["reason"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let reason = EvictionCancelled::from_usize(i);
|
||||
let s = reason.as_str();
|
||||
cancelled_evictions.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let started_deletes = metrics::register_int_counter!(
|
||||
"pageserver_layer_started_deletes",
|
||||
"Deletions on drop pending in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
let completed_deletes = metrics::register_int_counter!(
|
||||
"pageserver_layer_completed_deletes",
|
||||
"Deletions on drop completed in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let failed_deletes = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_failed_deletes_count",
|
||||
"Different reasons for deletions on drop to have failed",
|
||||
&["reason"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let reason = DeleteFailed::from_usize(i);
|
||||
let s = reason.as_str();
|
||||
failed_deletes.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let rare_counters = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_assumed_rare_count",
|
||||
"Times unexpected or assumed rare event happened",
|
||||
&["event"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let event = RareEvent::from_usize(i);
|
||||
let s = event.as_str();
|
||||
rare_counters.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let inits_cancelled = metrics::register_int_counter!(
|
||||
"pageserver_layer_inits_cancelled_count",
|
||||
"Times Layer initialization was cancelled",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let redownload_after = {
|
||||
let minute = 60.0;
|
||||
let hour = 60.0 * minute;
|
||||
metrics::register_histogram!(
|
||||
"pageserver_layer_redownloaded_after",
|
||||
"Time between evicting and re-downloading.",
|
||||
vec![
|
||||
10.0,
|
||||
30.0,
|
||||
minute,
|
||||
5.0 * minute,
|
||||
15.0 * minute,
|
||||
30.0 * minute,
|
||||
hour,
|
||||
12.0 * hour,
|
||||
]
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let time_to_evict = metrics::register_histogram!(
|
||||
"pageserver_layer_eviction_held_permit_seconds",
|
||||
"Time eviction held the permit.",
|
||||
vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
started_evictions,
|
||||
completed_evictions,
|
||||
cancelled_evictions,
|
||||
|
||||
started_deletes,
|
||||
completed_deletes,
|
||||
failed_deletes,
|
||||
|
||||
rare_counters,
|
||||
inits_cancelled,
|
||||
redownload_after,
|
||||
time_to_evict,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerImplMetrics {
|
||||
fn inc_started_evictions(&self) {
|
||||
self.started_evictions.inc();
|
||||
}
|
||||
fn inc_completed_evictions(&self) {
|
||||
self.completed_evictions.inc();
|
||||
}
|
||||
fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
|
||||
self.cancelled_evictions[reason].inc()
|
||||
}
|
||||
|
||||
fn inc_started_deletes(&self) {
|
||||
self.started_deletes.inc();
|
||||
}
|
||||
fn inc_completed_deletes(&self) {
|
||||
self.completed_deletes.inc();
|
||||
}
|
||||
fn inc_deletes_failed(&self, reason: DeleteFailed) {
|
||||
self.failed_deletes[reason].inc();
|
||||
}
|
||||
|
||||
/// Counted separatedly from failed layer deletes because we will complete the layer deletion
|
||||
/// attempt regardless of failure to delete local file.
|
||||
fn inc_delete_removes_failed(&self) {
|
||||
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
|
||||
}
|
||||
|
||||
/// Expected rare just as cancellations are rare, but we could have cancellations separate from
|
||||
/// the single caller which can start the download, so use this counter to separte them.
|
||||
fn inc_init_completed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected, and failures are unexpected
|
||||
fn inc_download_failed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
|
||||
}
|
||||
|
||||
/// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
|
||||
///
|
||||
/// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
|
||||
/// Option.
|
||||
fn inc_raced_wanted_evicted_accesses(&self) {
|
||||
self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
|
||||
}
|
||||
|
||||
/// These are only expected for [`Self::inc_init_cancelled`] amount when
|
||||
/// running with remote storage.
|
||||
fn inc_init_needed_no_download(&self) {
|
||||
self.rare_counters[RareEvent::InitWithoutDownload].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because all layer files should be readable and good
|
||||
fn inc_permanent_loading_failures(&self) {
|
||||
self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
|
||||
}
|
||||
|
||||
fn inc_init_cancelled(&self) {
|
||||
self.inits_cancelled.inc()
|
||||
}
|
||||
|
||||
fn record_redownloaded_after(&self, duration: std::time::Duration) {
|
||||
self.redownload_after.observe(duration.as_secs_f64())
|
||||
}
|
||||
|
||||
/// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
|
||||
/// instead cancel eviction if we would have read waiters. We cannot however separate reads
|
||||
/// from other evictions, so this could have noise as well.
|
||||
fn inc_evicted_with_waiters(&self) {
|
||||
self.rare_counters[RareEvent::EvictedWithWaiters].inc();
|
||||
}
|
||||
|
||||
/// Recorded at least initially as the permit is now acquired in async context before
|
||||
/// spawn_blocking action.
|
||||
fn record_time_to_evict(&self, duration: std::time::Duration) {
|
||||
self.time_to_evict.observe(duration.as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, enum_map::Enum)]
|
||||
enum EvictionCancelled {
|
||||
@@ -2254,21 +1898,6 @@ enum EvictionCancelled {
|
||||
UnexpectedEvictedState,
|
||||
}
|
||||
|
||||
impl EvictionCancelled {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
EvictionCancelled::LayerGone => "layer_gone",
|
||||
EvictionCancelled::TimelineGone => "timeline_gone",
|
||||
EvictionCancelled::VersionCheckFailed => "version_check_fail",
|
||||
EvictionCancelled::FileNotFound => "file_not_found",
|
||||
EvictionCancelled::RemoveFailed => "remove_failed",
|
||||
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
|
||||
EvictionCancelled::LostToDownload => "lost_to_download",
|
||||
EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
|
||||
EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(enum_map::Enum)]
|
||||
enum DeleteFailed {
|
||||
@@ -2276,15 +1905,6 @@ enum DeleteFailed {
|
||||
DeleteSchedulingFailed,
|
||||
}
|
||||
|
||||
impl DeleteFailed {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
DeleteFailed::TimelineGone => "timeline_gone",
|
||||
DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(enum_map::Enum)]
|
||||
enum RareEvent {
|
||||
RemoveOnDropFailed,
|
||||
@@ -2296,21 +1916,3 @@ enum RareEvent {
|
||||
EvictedWithWaiters,
|
||||
}
|
||||
|
||||
impl RareEvent {
|
||||
fn as_str(&self) -> &'static str {
|
||||
use RareEvent::*;
|
||||
|
||||
match self {
|
||||
RemoveOnDropFailed => "remove_on_drop_failed",
|
||||
InitCompletedWithoutRequester => "init_completed_without",
|
||||
DownloadFailedWithoutRequester => "download_failed_without",
|
||||
UpgradedWantedEvicted => "raced_wanted_evicted",
|
||||
InitWithoutDownload => "init_needed_no_download",
|
||||
PermanentLoadingFailure => "permanent_loading_failure",
|
||||
EvictedWithWaiters => "evicted_with_waiters",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
|
||||
once_cell::sync::Lazy::new(LayerImplMetrics::default);
|
||||
|
||||
@@ -238,7 +238,7 @@ async fn smoke_test() {
|
||||
rtc.get_remote_physical_size(),
|
||||
dummy_layer.metadata().file_size
|
||||
);
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
|
||||
}
|
||||
|
||||
/// This test demonstrates a previous hang when a eviction and deletion were requested at the same
|
||||
@@ -311,11 +311,6 @@ async fn evict_and_wait_on_wanted_deleted() {
|
||||
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
|
||||
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
}
|
||||
|
||||
/// This test ensures we are able to read the layer while the layer eviction has been
|
||||
@@ -366,7 +361,7 @@ fn read_wins_pending_eviction() {
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
|
||||
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let (arrival, arrived_at_barrier) = utils::completion::channel();
|
||||
@@ -398,18 +393,7 @@ fn read_wins_pending_eviction() {
|
||||
|
||||
// works as intended: evictions lose to "downloads"
|
||||
assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
|
||||
// this is not wrong: the eviction is technically still "on the way" as it's still queued
|
||||
// because of a failpoint
|
||||
assert_eq!(
|
||||
0,
|
||||
LAYER_IMPL_METRICS
|
||||
.cancelled_evictions
|
||||
.values()
|
||||
.map(|ctr| ctr.get())
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
|
||||
drop(completion);
|
||||
|
||||
@@ -417,26 +401,9 @@ fn read_wins_pending_eviction() {
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
|
||||
.await;
|
||||
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
|
||||
|
||||
// now we finally can observe the original eviction failing
|
||||
// it would had been possible to observe it earlier, but here it is guaranteed to have
|
||||
// happened.
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS
|
||||
.cancelled_evictions
|
||||
.values()
|
||||
.map(|ctr| ctr.get())
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
|
||||
);
|
||||
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@@ -499,7 +466,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
|
||||
|
||||
let (completion1, barrier) = utils::completion::channel();
|
||||
let mut completion1 = Some(completion1);
|
||||
@@ -534,20 +501,9 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
|
||||
|
||||
// works as intended: evictions lose to "downloads"
|
||||
assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
|
||||
|
||||
// this is not wrong: the eviction is technically still "on the way" as it's still queued
|
||||
// because of a failpoint
|
||||
assert_eq!(
|
||||
0,
|
||||
LAYER_IMPL_METRICS
|
||||
.cancelled_evictions
|
||||
.values()
|
||||
.map(|ctr| ctr.get())
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
|
||||
|
||||
// configure another failpoint for the second eviction -- evictions are per initialization,
|
||||
// so now that we've reinitialized the inner, we get to run two of them at the same time.
|
||||
@@ -567,13 +523,10 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
|
||||
|
||||
arrived_at_barrier.wait().await;
|
||||
|
||||
assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
|
||||
|
||||
let mut release_earlier_eviction = |expected_reason| {
|
||||
assert_eq!(
|
||||
0,
|
||||
LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
|
||||
);
|
||||
let mut release_earlier_eviction = |_expected_reason| {
|
||||
|
||||
|
||||
drop(completion1.take().unwrap());
|
||||
|
||||
@@ -586,10 +539,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
|
||||
);
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
@@ -612,19 +562,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
|
||||
.expect("eviction goes through now that spawn_blocking is unclogged")
|
||||
.expect("eviction should succeed, because version matches");
|
||||
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
|
||||
|
||||
// ensure the cancelled are unchanged
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS
|
||||
.cancelled_evictions
|
||||
.values()
|
||||
.map(|ctr| ctr.get())
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@@ -714,8 +652,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
.unwrap_err();
|
||||
assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
|
||||
|
||||
// failpoint is not counted as cancellation either
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
@@ -892,8 +829,7 @@ async fn eviction_cancellation_on_drop() {
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
}
|
||||
|
||||
// 1 == we only evict one of the layers
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
|
||||
|
||||
drop(resident);
|
||||
|
||||
@@ -902,10 +838,7 @@ async fn eviction_cancellation_on_drop() {
|
||||
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
|
||||
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
|
||||
use rand::Rng;
|
||||
use scopeguard::defer;
|
||||
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -19,7 +19,6 @@ use utils::completion::Barrier;
|
||||
use utils::pausable_failpoint;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
@@ -85,17 +84,15 @@ pub(crate) enum BackgroundLoopKind {
|
||||
SecondaryDownload,
|
||||
}
|
||||
|
||||
pub struct BackgroundLoopSemaphorePermit<'a> {
|
||||
pub struct BackgroundLoopSemaphorePermit {
|
||||
_permit: SemaphorePermit<'static>,
|
||||
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
|
||||
}
|
||||
|
||||
/// Acquires a semaphore permit, to limit concurrent background jobs.
|
||||
pub(crate) async fn acquire_concurrency_permit(
|
||||
loop_kind: BackgroundLoopKind,
|
||||
_ctx: &RequestContext,
|
||||
) -> BackgroundLoopSemaphorePermit<'static> {
|
||||
let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
|
||||
) -> BackgroundLoopSemaphorePermit {
|
||||
|
||||
if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
|
||||
pausable_failpoint!("initial-size-calculation-permit-pause");
|
||||
@@ -108,11 +105,9 @@ pub(crate) async fn acquire_concurrency_permit(
|
||||
};
|
||||
let permit = semaphore.acquire().await.expect("should never close");
|
||||
|
||||
recorder.acquired();
|
||||
|
||||
BackgroundLoopSemaphorePermit {
|
||||
_permit: permit,
|
||||
_recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,8 +130,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
|
||||
compaction_loop(tenant, cancel)
|
||||
// If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
@@ -161,8 +155,6 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await;
|
||||
@@ -186,8 +178,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
|
||||
tenant_housekeeping_loop(tenant, cancel)
|
||||
.instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await;
|
||||
@@ -592,8 +583,5 @@ pub(crate) fn warn_when_period_overrun(
|
||||
?task,
|
||||
"task iteration took longer than the configured period"
|
||||
);
|
||||
metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
|
||||
.with_label_values(&[task.into(), &format!("{}", period.as_secs())])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,8 +45,8 @@ pub struct Stats {
|
||||
}
|
||||
|
||||
pub enum ThrottleResult {
|
||||
NotThrottled { end: Instant },
|
||||
Throttled { end: Instant },
|
||||
NotThrottled { },
|
||||
Throttled { },
|
||||
}
|
||||
|
||||
impl Throttle {
|
||||
@@ -114,7 +114,7 @@ impl Throttle {
|
||||
let inner = self.inner.load_full(); // clones the `Inner` Arc
|
||||
|
||||
if !inner.enabled {
|
||||
return ThrottleResult::NotThrottled { end: start };
|
||||
return ThrottleResult::NotThrottled { };
|
||||
}
|
||||
|
||||
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
|
||||
@@ -127,9 +127,9 @@ impl Throttle {
|
||||
let wait_time = end - start;
|
||||
self.sum_throttled_usecs
|
||||
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
|
||||
ThrottleResult::Throttled { end }
|
||||
ThrottleResult::Throttled { }
|
||||
} else {
|
||||
ThrottleResult::NotThrottled { end: start }
|
||||
ThrottleResult::NotThrottled { }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +96,6 @@ use super::{
|
||||
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
@@ -104,10 +103,7 @@ use crate::context::{
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_AMORTIZED_GLOBAL, LAYERS_PER_READ_BATCH_GLOBAL,
|
||||
LAYERS_PER_READ_GLOBAL, ScanLatencyOngoingRecording, TimelineMetrics,
|
||||
};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::page_service::TenantManagerTypes;
|
||||
use crate::pgdatadir_mapping::{
|
||||
CalculateLogicalSizeError, CollectKeySpaceError, DirectoryKind, LsnForTimestamp,
|
||||
@@ -415,8 +411,6 @@ pub struct Timeline {
|
||||
/// Cloned from [`super::Tenant::pagestream_throttle`] on construction.
|
||||
pub(crate) pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
|
||||
|
||||
/// Size estimator for aux file v2
|
||||
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
|
||||
|
||||
/// Some test cases directly place keys into the timeline without actually modifying the directory
|
||||
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
|
||||
@@ -1210,20 +1204,20 @@ impl Timeline {
|
||||
ctx.task_kind(),
|
||||
);
|
||||
|
||||
let start = crate::metrics::GET_VECTORED_LATENCY
|
||||
.for_task_kind(ctx.task_kind())
|
||||
.map(|metric| (metric, Instant::now()));
|
||||
// let start = crate::metrics::GET_VECTORED_LATENCY
|
||||
// .for_task_kind(ctx.task_kind())
|
||||
// .map(|metric| (metric, Instant::now()));
|
||||
|
||||
let res = self
|
||||
self
|
||||
.get_vectored_impl(query, &mut ValuesReconstructState::new(io_concurrency), ctx)
|
||||
.await;
|
||||
.await
|
||||
|
||||
if let Some((metric, start)) = start {
|
||||
let elapsed = start.elapsed();
|
||||
metric.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
// if let Some((metric, start)) = start {
|
||||
// let elapsed = start.elapsed();
|
||||
// metric.observe(elapsed.as_secs_f64());
|
||||
// }
|
||||
|
||||
res
|
||||
|
||||
}
|
||||
|
||||
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
|
||||
@@ -1262,21 +1256,21 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let start = crate::metrics::SCAN_LATENCY
|
||||
.for_task_kind(ctx.task_kind())
|
||||
.map(ScanLatencyOngoingRecording::start_recording);
|
||||
// let start = crate::metrics::SCAN_LATENCY
|
||||
// .for_task_kind(ctx.task_kind())
|
||||
// .map(ScanLatencyOngoingRecording::start_recording);
|
||||
|
||||
let query = VersionedKeySpaceQuery::uniform(keyspace, lsn);
|
||||
|
||||
let vectored_res = self
|
||||
self
|
||||
.get_vectored_impl(query, &mut ValuesReconstructState::new(io_concurrency), ctx)
|
||||
.await;
|
||||
.await
|
||||
|
||||
if let Some(recording) = start {
|
||||
recording.observe();
|
||||
}
|
||||
// if let Some(recording) = start {
|
||||
// recording.observe();
|
||||
// }
|
||||
|
||||
vectored_res
|
||||
|
||||
}
|
||||
|
||||
pub(super) async fn get_vectored_impl(
|
||||
@@ -1385,7 +1379,7 @@ impl Timeline {
|
||||
return (key, Err(err));
|
||||
}
|
||||
};
|
||||
DELTAS_PER_READ_GLOBAL.observe(converted.num_deltas() as f64);
|
||||
|
||||
|
||||
// The walredo module expects the records to be descending in terms of Lsn.
|
||||
// And we submit the IOs in that order, so, there shuold be no need to sort here.
|
||||
@@ -1423,42 +1417,21 @@ impl Timeline {
|
||||
// when they're missing. Instead they are omitted from the resulting btree
|
||||
// (this is a requirement, not a bug). Skip updating the metric in these cases
|
||||
// to avoid infinite results.
|
||||
if !results.is_empty() {
|
||||
if layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
|
||||
let total_keyspace = query.total_keyspace();
|
||||
let max_request_lsn = query.high_watermark_lsn().expect("Validated previously");
|
||||
if !results.is_empty() && layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
|
||||
let total_keyspace = query.total_keyspace();
|
||||
let max_request_lsn = query.high_watermark_lsn().expect("Validated previously");
|
||||
|
||||
static LOG_PACER: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
|
||||
LOG_PACER.lock().unwrap().call(|| {
|
||||
let num_keys = total_keyspace.total_raw_size();
|
||||
let num_pages = results.len();
|
||||
tracing::info!(
|
||||
shard_id = %self.tenant_shard_id.shard_slug(),
|
||||
lsn = %max_request_lsn,
|
||||
"Vectored read for {total_keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// Records the number of layers visited in a few different ways:
|
||||
//
|
||||
// * LAYERS_PER_READ: all layers count towards every read in the batch, because each
|
||||
// layer directly affects its observed latency.
|
||||
//
|
||||
// * LAYERS_PER_READ_BATCH: all layers count towards each batch, to get the per-batch
|
||||
// layer visits and access cost.
|
||||
//
|
||||
// * LAYERS_PER_READ_AMORTIZED: the average layer count per read, to get the amortized
|
||||
// read amplification after batching.
|
||||
let layers_visited = layers_visited as f64;
|
||||
let avg_layers_visited = layers_visited / results.len() as f64;
|
||||
LAYERS_PER_READ_BATCH_GLOBAL.observe(layers_visited);
|
||||
for _ in &results {
|
||||
self.metrics.layers_per_read.observe(layers_visited);
|
||||
LAYERS_PER_READ_GLOBAL.observe(layers_visited);
|
||||
LAYERS_PER_READ_AMORTIZED_GLOBAL.observe(avg_layers_visited);
|
||||
}
|
||||
static LOG_PACER: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
|
||||
LOG_PACER.lock().unwrap().call(|| {
|
||||
let num_keys = total_keyspace.total_raw_size();
|
||||
let num_pages = results.len();
|
||||
tracing::info!(
|
||||
shard_id = %self.tenant_shard_id.shard_slug(),
|
||||
lsn = %max_request_lsn,
|
||||
"Vectored read for {total_keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
@@ -1510,9 +1483,6 @@ impl Timeline {
|
||||
guard.layer_size_sum()
|
||||
}
|
||||
|
||||
pub(crate) fn resident_physical_size(&self) -> u64 {
|
||||
self.metrics.resident_physical_size_get()
|
||||
}
|
||||
|
||||
pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
|
||||
array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
|
||||
@@ -1575,8 +1545,6 @@ impl Timeline {
|
||||
WaitLsnTimeout::Default => self.conf.wait_lsn_timeout,
|
||||
};
|
||||
|
||||
let timer = crate::metrics::WAIT_LSN_TIME.start_timer();
|
||||
let start_finish_counterpair_guard = self.metrics.wait_lsn_start_finish_counterpair.guard();
|
||||
|
||||
let wait_for_timeout = self.last_record_lsn.wait_for_timeout(lsn, timeout);
|
||||
let wait_for_timeout = std::pin::pin!(wait_for_timeout);
|
||||
@@ -1593,11 +1561,8 @@ impl Timeline {
|
||||
ready,
|
||||
is_slow,
|
||||
elapsed_total,
|
||||
elapsed_since_last_callback,
|
||||
elapsed_since_last_callback: _,
|
||||
}| {
|
||||
self.metrics
|
||||
.wait_lsn_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed_since_last_callback.as_micros()).unwrap());
|
||||
if !is_slow {
|
||||
return;
|
||||
}
|
||||
@@ -1627,8 +1592,6 @@ impl Timeline {
|
||||
let res = wait_for_timeout.await;
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(logging_permit);
|
||||
drop(start_finish_counterpair_guard);
|
||||
drop(timer);
|
||||
match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
@@ -2718,15 +2681,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
}
|
||||
|
||||
fn get_evictions_low_residence_duration_metric_threshold(
|
||||
tenant_conf: &pageserver_api::models::TenantConfig,
|
||||
default_tenant_conf: &pageserver_api::config::TenantConfigToml,
|
||||
) -> Duration {
|
||||
tenant_conf
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -2802,28 +2756,8 @@ impl Timeline {
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&new_conf.tenant_conf,
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
|
||||
let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
|
||||
let timeline_id_str = self.timeline_id.to_string();
|
||||
|
||||
self.remote_client.update_config(&new_conf.location);
|
||||
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
.unwrap()
|
||||
.change_threshold(
|
||||
&tenant_id_str,
|
||||
&shard_id_str,
|
||||
&timeline_id_str,
|
||||
new_threshold,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2857,13 +2791,6 @@ impl Timeline {
|
||||
let (layer_flush_start_tx, _) = tokio::sync::watch::channel((0, disk_consistent_lsn));
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
|
||||
let evictions_low_residence_duration_metric_threshold = {
|
||||
let loaded_tenant_conf = tenant_conf.load();
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&loaded_tenant_conf.tenant_conf,
|
||||
&conf.default_tenant_conf,
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ancestor) = &ancestor {
|
||||
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
|
||||
@@ -2876,12 +2803,7 @@ impl Timeline {
|
||||
let metrics = Arc::new(TimelineMetrics::new(
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
|
||||
"mtime",
|
||||
evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
));
|
||||
let aux_file_metrics = metrics.aux_file_size_gauge.clone();
|
||||
|
||||
let mut result = Timeline {
|
||||
conf,
|
||||
@@ -2989,8 +2911,6 @@ impl Timeline {
|
||||
|
||||
pagestream_throttle: resources.pagestream_throttle,
|
||||
|
||||
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
|
||||
|
||||
#[cfg(test)]
|
||||
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
|
||||
|
||||
@@ -3016,10 +2936,6 @@ impl Timeline {
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
|
||||
result
|
||||
.metrics
|
||||
.last_record_lsn_gauge
|
||||
.set(disk_consistent_lsn.0 as i64);
|
||||
result
|
||||
})
|
||||
}
|
||||
@@ -3175,8 +3091,6 @@ impl Timeline {
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
||||
|
||||
// Scan timeline directory and create ImageLayerName and DeltaFilename
|
||||
// structs representing all files on disk
|
||||
let timeline_path = self
|
||||
@@ -3337,7 +3251,6 @@ impl Timeline {
|
||||
num_layers, disk_consistent_lsn, total_physical_size
|
||||
);
|
||||
|
||||
timer.stop_and_record();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3405,7 +3318,7 @@ impl Timeline {
|
||||
|
||||
if let CurrentLogicalSize::Approximate(_) = ¤t_size {
|
||||
if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
|
||||
let first = self
|
||||
let _= self
|
||||
.current_logical_size
|
||||
.did_return_approximate_to_walreceiver
|
||||
.compare_exchange(
|
||||
@@ -3413,11 +3326,8 @@ impl Timeline {
|
||||
true,
|
||||
AtomicOrdering::Relaxed,
|
||||
AtomicOrdering::Relaxed,
|
||||
)
|
||||
.is_ok();
|
||||
if first {
|
||||
crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
|
||||
}
|
||||
).is_ok();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3489,7 +3399,7 @@ impl Timeline {
|
||||
self.current_logical_size.initialized.add_permits(1);
|
||||
}
|
||||
|
||||
let try_once = |attempt: usize| {
|
||||
let try_once = |_attempt: usize| {
|
||||
let background_ctx = &background_ctx;
|
||||
let self_ref = &self;
|
||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||
@@ -3500,7 +3410,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||
let (_maybe_permit, circumstances) = tokio::select! {
|
||||
let (_maybe_permit, _circumstances) = tokio::select! {
|
||||
permit = wait_for_permit => {
|
||||
(Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
|
||||
}
|
||||
@@ -3517,12 +3427,6 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
let metrics_guard = if attempt == 1 {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
|
||||
} else {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref
|
||||
@@ -3549,7 +3453,7 @@ impl Timeline {
|
||||
|
||||
// TODO: add aux file size to logical size
|
||||
|
||||
Ok((calculated_size, metrics_guard))
|
||||
Ok(calculated_size)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3586,27 +3490,14 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
let (calculated_size, metrics_guard) = match retrying.await {
|
||||
let calculated_size = match retrying.await {
|
||||
ControlFlow::Continue(calculated_size) => calculated_size,
|
||||
ControlFlow::Break(()) => return,
|
||||
};
|
||||
|
||||
// we cannot query current_logical_size.current_size() to know the current
|
||||
// *negative* value, only truncated to u64.
|
||||
let added = self
|
||||
.current_logical_size
|
||||
.size_added_after_initial
|
||||
.load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum = calculated_size.saturating_add_signed(added);
|
||||
|
||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
||||
self.metrics.current_logical_size_gauge.set(sum);
|
||||
|
||||
self.current_logical_size
|
||||
.initial_logical_size
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.set((calculated_size,))
|
||||
.expect("only this task sets it");
|
||||
}
|
||||
|
||||
@@ -3671,7 +3562,7 @@ impl Timeline {
|
||||
async fn calculate_logical_size(
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
_cause: LogicalSizeCalculationCause,
|
||||
_guard: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
@@ -3690,20 +3581,13 @@ impl Timeline {
|
||||
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
|
||||
return Ok(size);
|
||||
}
|
||||
let storage_time_metrics = match cause {
|
||||
LogicalSizeCalculationCause::Initial
|
||||
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
|
||||
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation => {
|
||||
&self.metrics.imitate_logical_size_histo
|
||||
}
|
||||
};
|
||||
let timer = storage_time_metrics.start_timer();
|
||||
|
||||
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
|
||||
Ok(logical_size)
|
||||
}
|
||||
|
||||
@@ -3712,21 +3596,6 @@ impl Timeline {
|
||||
let logical_size = &self.current_logical_size;
|
||||
logical_size.increment_size(delta);
|
||||
|
||||
// Also set the value in the prometheus gauge. Note that
|
||||
// there is a race condition here: if this is is called by two
|
||||
// threads concurrently, the prometheus gauge might be set to
|
||||
// one value while current_logical_size is set to the
|
||||
// other.
|
||||
match logical_size.current_size() {
|
||||
CurrentLogicalSize::Exact(ref new_current_size) => self
|
||||
.metrics
|
||||
.current_logical_size_gauge
|
||||
.set(new_current_size.into()),
|
||||
CurrentLogicalSize::Approximate(_) => {
|
||||
// don't update the gauge yet, this allows us not to update the gauge back and
|
||||
// forth between the initial size calculation task.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) {
|
||||
@@ -3764,26 +3633,8 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: remove this, there's no place in the code that updates this aux metrics.
|
||||
let aux_metric =
|
||||
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum_of_entries = self
|
||||
.directory_metrics
|
||||
.iter()
|
||||
.map(|v| v.load(AtomicOrdering::Relaxed))
|
||||
.sum();
|
||||
// Set a high general threshold and a lower threshold for the auxiliary files,
|
||||
// as we can have large numbers of relations in the db directory.
|
||||
const SUM_THRESHOLD: u64 = 5000;
|
||||
const AUX_THRESHOLD: u64 = 1000;
|
||||
if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
|
||||
self.metrics
|
||||
.directory_entries_count_gauge
|
||||
.set(sum_of_entries);
|
||||
} else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
|
||||
metric.set(sum_of_entries);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
async fn find_layer(
|
||||
@@ -4503,8 +4354,6 @@ impl Timeline {
|
||||
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
self.metrics.last_record_lsn_gauge.set(new_lsn.0 as i64);
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
@@ -4624,17 +4473,10 @@ impl Timeline {
|
||||
"stalling layer flushes for compaction backpressure at {l0_count} \
|
||||
L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
|
||||
);
|
||||
let stall_timer = self
|
||||
.metrics
|
||||
.flush_delay_histo
|
||||
.start_timer()
|
||||
.record_on_drop();
|
||||
|
||||
tokio::select! {
|
||||
result = watch_l0.wait_for(|l0| *l0 < stall_threshold) => {
|
||||
if let Ok(l0) = result.as_deref() {
|
||||
let delay = stall_timer.elapsed().as_secs_f64();
|
||||
info!("resuming layer flushes at {l0} L0 layers after {delay:.3}s");
|
||||
}
|
||||
_result = watch_l0.wait_for(|l0| *l0 < stall_threshold) => {
|
||||
|
||||
},
|
||||
_ = self.cancel.cancelled() => {},
|
||||
}
|
||||
@@ -4643,7 +4485,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Flush the layer.
|
||||
let flush_timer = self.metrics.flush_time_histo.start_timer();
|
||||
let flush_timer = Instant::now();
|
||||
match self.flush_frozen_layer(layer, ctx).await {
|
||||
Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn),
|
||||
Err(FlushLayerError::Cancelled) => {
|
||||
@@ -4659,7 +4501,7 @@ impl Timeline {
|
||||
break err.map(|_| ());
|
||||
}
|
||||
}
|
||||
let flush_duration = flush_timer.stop_and_record();
|
||||
let flush_duration = flush_timer.elapsed();
|
||||
|
||||
// Notify the tenant compaction loop if L0 compaction is needed.
|
||||
let l0_count = *watch_l0.borrow();
|
||||
@@ -4677,11 +4519,7 @@ impl Timeline {
|
||||
"delaying layer flush by {delay:.3}s for compaction backpressure at \
|
||||
{l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
|
||||
);
|
||||
let _delay_timer = self
|
||||
.metrics
|
||||
.flush_delay_histo
|
||||
.start_timer()
|
||||
.record_on_drop();
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(flush_duration) => {},
|
||||
_ = watch_l0.wait_for(|l0| *l0 < delay_threshold) => {},
|
||||
@@ -4923,9 +4761,6 @@ impl Timeline {
|
||||
"disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}"
|
||||
);
|
||||
|
||||
self.metrics
|
||||
.disk_consistent_lsn_gauge
|
||||
.set(new_value.0 as i64);
|
||||
new_value != old_value
|
||||
}
|
||||
|
||||
@@ -5411,7 +5246,7 @@ impl Timeline {
|
||||
last_status: LastImageLayerCreationStatus,
|
||||
yield_for_l0: bool,
|
||||
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
|
||||
if partitioning.parts.is_empty() {
|
||||
warn!("no partitions to create image layers for");
|
||||
@@ -5444,8 +5279,8 @@ impl Timeline {
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
let mut partition_processed = 0;
|
||||
let mut total_partitions = partitioning.parts.len();
|
||||
|
||||
let total_partitions = partitioning.parts.len();
|
||||
let mut last_partition_processed = None;
|
||||
let mut partition_parts = partitioning.parts.clone();
|
||||
|
||||
@@ -5470,7 +5305,7 @@ impl Timeline {
|
||||
break; // with found=false
|
||||
}
|
||||
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
|
||||
total_partitions = partition_parts.len();
|
||||
|
||||
// Update the start key to the partition start.
|
||||
start = partition_parts[0].start().unwrap();
|
||||
found = true;
|
||||
@@ -5487,7 +5322,6 @@ impl Timeline {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
partition_processed += 1;
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
@@ -5650,28 +5484,16 @@ impl Timeline {
|
||||
.open_mut()?
|
||||
.track_new_image_layers(&image_layers, &self.metrics);
|
||||
drop_wlock(guard);
|
||||
let duration = timer.stop_and_record();
|
||||
|
||||
|
||||
// Creating image layers may have caused some previously visible layers to be covered
|
||||
if !image_layers.is_empty() {
|
||||
self.update_layer_visibility().await?;
|
||||
}
|
||||
|
||||
let total_layer_size = image_layers
|
||||
.iter()
|
||||
.map(|l| l.metadata().file_size)
|
||||
.sum::<u64>();
|
||||
|
||||
|
||||
if !image_layers.is_empty() {
|
||||
info!(
|
||||
"created {} image layers ({} bytes) in {}s, processed {} out of {} partitions",
|
||||
image_layers.len(),
|
||||
total_layer_size,
|
||||
duration.as_secs_f64(),
|
||||
partition_processed,
|
||||
total_partitions
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Ok((
|
||||
image_layers,
|
||||
@@ -6206,11 +6028,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<GcCutoffs, PageReconstructError> {
|
||||
let _timer = self
|
||||
.metrics
|
||||
.find_gc_cutoffs_histo
|
||||
.start_timer()
|
||||
.record_on_drop();
|
||||
|
||||
|
||||
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
|
||||
|
||||
@@ -6277,7 +6095,7 @@ impl Timeline {
|
||||
guard = self.gc_lock.lock() => guard,
|
||||
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
|
||||
};
|
||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||
|
||||
|
||||
fail_point!("before-timeline-gc");
|
||||
|
||||
@@ -6334,9 +6152,7 @@ impl Timeline {
|
||||
// It is an easy way to unset it when standby disappears without adding
|
||||
// more conf options.
|
||||
self.standby_horizon.store(Lsn::INVALID);
|
||||
self.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(Lsn::INVALID.0 as i64);
|
||||
|
||||
|
||||
let res = self
|
||||
.gc_timeline(
|
||||
@@ -6351,8 +6167,7 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// only record successes
|
||||
timer.stop_and_record();
|
||||
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -1133,16 +1133,15 @@ impl Timeline {
|
||||
|
||||
// 1. L0 Compact
|
||||
let l0_outcome = {
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
let l0_outcome = self
|
||||
|
||||
self
|
||||
.compact_level0(
|
||||
target_file_size,
|
||||
options.flags.contains(CompactFlags::ForceL0Compaction),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
l0_outcome
|
||||
.await?
|
||||
|
||||
};
|
||||
|
||||
if options.flags.contains(CompactFlags::OnlyL0Compaction) {
|
||||
@@ -4041,7 +4040,7 @@ impl TimelineAdaptor {
|
||||
key_range: &Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CreateImageLayersError> {
|
||||
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
|
||||
let image_layer_writer = ImageLayerWriter::new(
|
||||
self.timeline.conf,
|
||||
@@ -4087,7 +4086,7 @@ impl TimelineAdaptor {
|
||||
self.new_images.push(image_layer);
|
||||
}
|
||||
|
||||
timer.stop_and_record();
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ impl Timeline {
|
||||
) -> ControlFlow<(), Instant> {
|
||||
debug!("eviction iteration: {policy:?}");
|
||||
let start = Instant::now();
|
||||
let (period, threshold) = match policy {
|
||||
let (period, _) = match policy {
|
||||
EvictionPolicy::NoEviction => {
|
||||
// check again in 10 seconds; XXX config watch mechanism
|
||||
return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
|
||||
@@ -159,16 +159,6 @@ impl Timeline {
|
||||
period,
|
||||
BackgroundLoopKind::Eviction,
|
||||
);
|
||||
// FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
|
||||
// don't think that is a relevant fear however, and regardless the imitation should be the
|
||||
// most costly part.
|
||||
crate::metrics::EVICTION_ITERATION_DURATION
|
||||
.get_metric_with_label_values(&[
|
||||
&format!("{}", period.as_secs()),
|
||||
&format!("{}", threshold.as_secs()),
|
||||
])
|
||||
.unwrap()
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
ControlFlow::Continue(start + period)
|
||||
}
|
||||
@@ -325,7 +315,7 @@ impl Timeline {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
|
||||
) -> ControlFlow<(), BackgroundLoopSemaphorePermit> {
|
||||
let acquire_permit =
|
||||
crate::tenant::tasks::acquire_concurrency_permit(BackgroundLoopKind::Eviction, ctx);
|
||||
|
||||
@@ -367,7 +357,7 @@ impl Timeline {
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
permit: BackgroundLoopSemaphorePermit<'static>,
|
||||
permit: BackgroundLoopSemaphorePermit,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
if !self.tenant_shard_id.is_shard_zero() {
|
||||
|
||||
@@ -59,11 +59,7 @@ impl HeatmapLayersDownloader {
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
heatmap_layers=%heatmap.all_layers().count(),
|
||||
"Starting heatmap layers download"
|
||||
);
|
||||
|
||||
|
||||
let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
|
||||
|layer| {
|
||||
@@ -93,7 +89,7 @@ impl HeatmapLayersDownloader {
|
||||
tokio::select! {
|
||||
_ = stream.collect::<()>() => {
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
|
||||
"Heatmap layers download completed"
|
||||
);
|
||||
},
|
||||
|
||||
@@ -290,7 +290,7 @@ impl OpenLayerManager {
|
||||
lsn: Lsn,
|
||||
last_freeze_at: &AtomicLsn,
|
||||
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) -> bool {
|
||||
let Lsn(last_record_lsn) = lsn;
|
||||
let end_lsn = Lsn(last_record_lsn + 1);
|
||||
@@ -299,10 +299,6 @@ impl OpenLayerManager {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
open_layer.freeze(end_lsn).await;
|
||||
|
||||
// Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
|
||||
// TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
|
||||
// reference to the timeline metrics. Other methods use a metrics borrow as well.
|
||||
metrics.inc_frozen_layer(open_layer);
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
@@ -334,16 +330,12 @@ impl OpenLayerManager {
|
||||
pub(crate) fn track_new_image_layers(
|
||||
&mut self,
|
||||
image_layers: &[ResidentLayer],
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in image_layers {
|
||||
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
|
||||
// record these here instead of Layer::finish_creating because otherwise partial
|
||||
// failure with create_image_layers would balloon up the physical size gauge. downside
|
||||
// is that all layers need to be created before metrics are updated.
|
||||
metrics.record_new_file_metrics(layer.layer_desc().file_size);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
@@ -353,14 +345,13 @@ impl OpenLayerManager {
|
||||
&mut self,
|
||||
delta_layer: Option<&ResidentLayer>,
|
||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let inmem = self
|
||||
.layer_map
|
||||
.frozen_layers
|
||||
.pop_front()
|
||||
.expect("there must be a inmem layer to flush");
|
||||
metrics.dec_frozen_layer(&inmem);
|
||||
|
||||
// Only one task may call this function at a time (for this
|
||||
// timeline). If two tasks tried to flush the same frozen
|
||||
@@ -370,7 +361,6 @@ impl OpenLayerManager {
|
||||
if let Some(l) = delta_layer {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
updates.flush();
|
||||
}
|
||||
}
|
||||
@@ -380,12 +370,11 @@ impl OpenLayerManager {
|
||||
&mut self,
|
||||
compact_from: &[Layer],
|
||||
compact_to: &[ResidentLayer],
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for l in compact_to {
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
for l in compact_from {
|
||||
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
@@ -438,7 +427,7 @@ impl OpenLayerManager {
|
||||
rewrite_layers: &[(Layer, ResidentLayer)],
|
||||
drop_layers: &[Layer],
|
||||
add_layers: &[ResidentLayer],
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for (old_layer, new_layer) in rewrite_layers {
|
||||
@@ -469,14 +458,12 @@ impl OpenLayerManager {
|
||||
&mut self.layer_fmgr,
|
||||
);
|
||||
|
||||
metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
|
||||
}
|
||||
for l in drop_layers {
|
||||
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
for l in add_layers {
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ pub(super) struct LogicalSize {
|
||||
/// the initial size at a different LSN.
|
||||
pub initial_logical_size: OnceCell<(
|
||||
u64,
|
||||
crate::metrics::initial_logical_size::FinishedCalculationGuard,
|
||||
)>,
|
||||
|
||||
/// Cancellation for the best-effort logical size calculation.
|
||||
@@ -130,11 +129,7 @@ impl CurrentLogicalSize {
|
||||
impl LogicalSize {
|
||||
pub(super) fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value((0, {
|
||||
crate::metrics::initial_logical_size::START_CALCULATION
|
||||
.first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
|
||||
.calculation_result_saved()
|
||||
})),
|
||||
initial_logical_size: OnceCell::with_value((0,)),
|
||||
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
@@ -159,7 +154,7 @@ impl LogicalSize {
|
||||
// ^^^ keep this type explicit so that the casts in this function break if
|
||||
// we change the type.
|
||||
match self.initial_logical_size.get() {
|
||||
Some((initial_size, _)) => {
|
||||
Some((initial_size, )) => {
|
||||
CurrentLogicalSize::Exact(Exact(initial_size.checked_add_signed(size_increment)
|
||||
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
||||
.unwrap()))
|
||||
@@ -181,7 +176,7 @@ impl LogicalSize {
|
||||
/// available for re-use. This doesn't contain the incremental part.
|
||||
pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
|
||||
match self.initial_part_end {
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, _)| *s),
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, )| *s),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,10 +39,6 @@ use utils::postgres_client::{
|
||||
use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
|
||||
use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{
|
||||
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::{Timeline, debug_assert_current_span_has_tenant_and_timeline_id};
|
||||
|
||||
@@ -76,11 +72,6 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
}
|
||||
|
||||
WALRECEIVER_ACTIVE_MANAGERS.inc();
|
||||
scopeguard::defer! {
|
||||
WALRECEIVER_ACTIVE_MANAGERS.dec();
|
||||
}
|
||||
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
|
||||
timeline_id: connection_manager_state.timeline.timeline_id,
|
||||
@@ -526,9 +517,6 @@ impl ConnectionManagerState {
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
|
||||
WALRECEIVER_SWITCHES
|
||||
.with_label_values(&[new_sk.reason.name()])
|
||||
.inc();
|
||||
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
@@ -731,8 +719,6 @@ impl ConnectionManagerState {
|
||||
}
|
||||
};
|
||||
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
trace!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
@@ -742,10 +728,6 @@ impl ConnectionManagerState {
|
||||
self.timeline
|
||||
.standby_horizon
|
||||
.store(Lsn(timeline_update.standby_horizon));
|
||||
self.timeline
|
||||
.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(timeline_update.standby_horizon as i64);
|
||||
}
|
||||
|
||||
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
|
||||
@@ -763,7 +745,6 @@ impl ConnectionManagerState {
|
||||
%new_safekeeper_id,
|
||||
"New SK node was added",
|
||||
);
|
||||
WALRECEIVER_CANDIDATES_ADDED.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1051,7 +1032,6 @@ impl ConnectionManagerState {
|
||||
"Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"
|
||||
);
|
||||
self.wal_connection_retries.remove(&node_id);
|
||||
WALRECEIVER_CANDIDATES_REMOVED.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1078,6 +1058,7 @@ struct NewWalConnectionCandidate {
|
||||
safekeeper_id: NodeId,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
availability_zone: Option<String>,
|
||||
#[allow(dead_code)]
|
||||
reason: ReconnectReason,
|
||||
}
|
||||
|
||||
@@ -1106,18 +1087,6 @@ enum ReconnectReason {
|
||||
},
|
||||
}
|
||||
|
||||
impl ReconnectReason {
|
||||
fn name(&self) -> &str {
|
||||
match self {
|
||||
ReconnectReason::NoExistingConnection => "NoExistingConnection",
|
||||
ReconnectReason::LaggingWal { .. } => "LaggingWal",
|
||||
ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
|
||||
ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
|
||||
ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
|
||||
@@ -36,7 +36,6 @@ use wal_decoder::wire_format::FromWireFormat;
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{LIVE_CONNECTIONS, WAL_INGEST, WALRECEIVER_STARTED_CONNECTIONS};
|
||||
use crate::pgdatadir_mapping::DatadirModification;
|
||||
use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
|
||||
use crate::tenant::{
|
||||
@@ -137,7 +136,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
GateError::GateClosed => WalReceiverError::ClosedGate,
|
||||
})?;
|
||||
|
||||
WALRECEIVER_STARTED_CONNECTIONS.inc();
|
||||
|
||||
|
||||
// Connect to the database in replication mode.
|
||||
info!("connecting to {wal_source_connconf:?}");
|
||||
@@ -223,10 +222,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["wal_receiver"])
|
||||
.guard();
|
||||
|
||||
let identify = identify_system(&replication_client).await?;
|
||||
info!("{identify:?}");
|
||||
|
||||
@@ -344,7 +339,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
|
||||
@@ -417,21 +412,13 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx: &RequestContext,
|
||||
uncommitted: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.records_committed.inc_by(*uncommitted);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
|
||||
*uncommitted = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
.wal_records_received
|
||||
.inc_by(records.len() as u64);
|
||||
}
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
@@ -441,9 +428,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
|
||||
if interpreted.is_observed() {
|
||||
WAL_INGEST.records_observed.inc();
|
||||
}
|
||||
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
@@ -515,12 +499,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
@@ -534,7 +515,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
trace!("received XLogData between {startlsn} and {endlsn}");
|
||||
|
||||
WAL_INGEST.bytes_received.inc_by(data.len() as u64);
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
{
|
||||
@@ -576,7 +557,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
timeline.metrics.wal_records_received.inc();
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
@@ -592,7 +572,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
})?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use pageserver_api::models::PageserverUtilization;
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::NODE_UTILIZATION_SCORE;
|
||||
|
||||
use crate::tenant::mgr::TenantManager;
|
||||
|
||||
pub(crate) fn regenerate(
|
||||
@@ -53,7 +53,7 @@ pub(crate) fn regenerate(
|
||||
// Express a static value for how many shards we may schedule on one node
|
||||
const MAX_SHARDS: u32 = 5000;
|
||||
|
||||
let mut doc = PageserverUtilization {
|
||||
let doc = PageserverUtilization {
|
||||
disk_usage_bytes: used,
|
||||
free_space_bytes: free,
|
||||
disk_wanted_bytes,
|
||||
@@ -63,10 +63,7 @@ pub(crate) fn regenerate(
|
||||
utilization_score: None,
|
||||
captured_at: utils::serde_system_time::SystemTime(captured_at),
|
||||
};
|
||||
|
||||
// Initialize `PageserverUtilization::utilization_score`
|
||||
let score = doc.cached_score();
|
||||
NODE_UTILIZATION_SCORE.set(score);
|
||||
|
||||
|
||||
Ok(doc)
|
||||
}
|
||||
|
||||
@@ -27,12 +27,9 @@ use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
|
||||
use crate::page_cache::{PAGE_SZ, PageWriteGuard};
|
||||
pub(crate) mod io_engine;
|
||||
pub use io_engine::{
|
||||
@@ -431,9 +428,7 @@ impl OpenFiles {
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::CloseByReplace)
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
drop(old_file);
|
||||
}
|
||||
|
||||
// Prepare the slot for reuse and return it
|
||||
@@ -532,13 +527,9 @@ impl<T> MaybeFatalIo<T> for std::io::Result<T> {
|
||||
/// where "support" means that we measure wall clock time.
|
||||
macro_rules! observe_duration {
|
||||
($op:expr, $($body:tt)*) => {{
|
||||
let instant = Instant::now();
|
||||
let result = $($body)*;
|
||||
let elapsed = instant.elapsed().as_secs_f64();
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get($op)
|
||||
.observe(elapsed);
|
||||
result
|
||||
|
||||
$($body)*
|
||||
|
||||
}}
|
||||
}
|
||||
|
||||
@@ -913,7 +904,7 @@ impl VirtualFileInner {
|
||||
&self,
|
||||
buf: tokio_epoll_uring::Slice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
@@ -930,9 +921,7 @@ impl VirtualFileInner {
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
ctx.io_size_metrics().read.add(size.into_u64());
|
||||
}
|
||||
|
||||
(buf, res)
|
||||
})
|
||||
}
|
||||
@@ -953,7 +942,7 @@ impl VirtualFileInner {
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
@@ -962,9 +951,7 @@ impl VirtualFileInner {
|
||||
observe_duration!(StorageIoOperation::Write, {
|
||||
let ((_file_guard, buf), result) =
|
||||
io_engine::get().write_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = result {
|
||||
ctx.io_size_metrics().write.add(size.into_u64());
|
||||
}
|
||||
|
||||
(buf, result)
|
||||
})
|
||||
}
|
||||
@@ -1263,9 +1250,7 @@ impl Drop for VirtualFileInner {
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
if let Some(fd) = slot_guard.file.take() {
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(fd));
|
||||
drop(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1334,7 +1319,6 @@ pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: Syn
|
||||
set_io_mode(mode);
|
||||
io_engine::init(engine);
|
||||
SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
|
||||
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
|
||||
|
||||
@@ -54,14 +54,7 @@ static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
|
||||
pub(crate) fn set(engine_kind: IoEngineKind) {
|
||||
let engine: IoEngine = engine_kind.into();
|
||||
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
#[cfg(not(test))]
|
||||
{
|
||||
let metric = &crate::metrics::virtual_file_io_engine::KIND;
|
||||
metric.reset();
|
||||
metric
|
||||
.with_label_values(&[&format!("{engine_kind}")])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
|
||||
@@ -82,7 +82,7 @@ pub async fn thread_local_system() -> Handle {
|
||||
match res {
|
||||
Ok(system) => {
|
||||
info!("successfully launched system");
|
||||
metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
|
||||
|
||||
Ok(system)
|
||||
}
|
||||
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
|
||||
@@ -90,7 +90,7 @@ pub async fn thread_local_system() -> Handle {
|
||||
info_span!("stats").in_scope(|| {
|
||||
emit_launch_failure_process_stats();
|
||||
});
|
||||
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
|
||||
|
||||
metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
|
||||
Err(())
|
||||
}
|
||||
|
||||
@@ -46,7 +46,6 @@ use wal_decoder::models::*;
|
||||
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
@@ -235,7 +234,7 @@ impl WalIngest {
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, WalIngestError> {
|
||||
WAL_INGEST.records_received.inc();
|
||||
|
||||
let prev_len = modification.len();
|
||||
|
||||
modification.set_lsn(interpreted.next_record_lsn)?;
|
||||
@@ -1443,9 +1442,6 @@ impl WalIngest {
|
||||
gap_blocks_filled += 1;
|
||||
}
|
||||
|
||||
WAL_INGEST
|
||||
.gap_blocks_zeroed_on_rel_extend
|
||||
.inc_by(gap_blocks_filled);
|
||||
|
||||
// Log something when relation extends cause use to fill gaps
|
||||
// with zero pages. Logging is rate limited per pg version to
|
||||
|
||||
@@ -40,10 +40,7 @@ use utils::sync::gate::GateError;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
|
||||
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
|
||||
};
|
||||
|
||||
|
||||
/// The real implementation that uses a Postgres process to
|
||||
/// perform WAL replay.
|
||||
@@ -353,7 +350,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
},
|
||||
Err(permit) => {
|
||||
let start = Instant::now();
|
||||
|
||||
// acquire guard before spawning process, so that we don't spawn new processes
|
||||
// if the gate is already closed.
|
||||
let _launched_processes_guard = match self.launched_processes.enter() {
|
||||
@@ -371,13 +368,9 @@ impl PostgresRedoManager {
|
||||
.context("launch walredo process")?,
|
||||
_launched_processes_guard,
|
||||
});
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
|
||||
info!(
|
||||
elapsed_ms = duration.as_millis(),
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
|
||||
|
||||
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
|
||||
proc
|
||||
@@ -471,9 +464,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
});
|
||||
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
@@ -538,9 +529,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
// Success!
|
||||
let duration = start_time.elapsed();
|
||||
// FIXME: using the same metric here creates a bimodal distribution by default, and because
|
||||
// there could be multiple batch sizes this would be N+1 modal.
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
|
||||
|
||||
debug!(
|
||||
"neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
|
||||
|
||||
@@ -21,7 +21,7 @@ use utils::poison::Poison;
|
||||
|
||||
use self::no_leak_child::NoLeakChild;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER, WalRedoKillCause};
|
||||
use crate::metrics:: WalRedoKillCause;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::span::debug_assert_current_span_has_tenant_id;
|
||||
|
||||
@@ -97,7 +97,6 @@ impl WalRedoProcess {
|
||||
// walredo request.
|
||||
.spawn_no_leak_child(tenant_shard_id)
|
||||
.context("spawn process")?;
|
||||
WAL_REDO_PROCESS_COUNTERS.started.inc();
|
||||
let mut child = scopeguard::guard(child, |child| {
|
||||
error!("killing wal-redo-postgres process due to a problem during launch");
|
||||
child.kill_and_wait(WalRedoKillCause::Startup);
|
||||
@@ -118,12 +117,7 @@ impl WalRedoProcess {
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("wal-redo-postgres stderr_logger_task finished");
|
||||
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
|
||||
}
|
||||
debug!("wal-redo-postgres stderr_logger_task started");
|
||||
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
|
||||
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
let mut stderr_lines = tokio::io::BufReader::new(stderr);
|
||||
@@ -231,7 +225,7 @@ impl WalRedoProcess {
|
||||
}
|
||||
}
|
||||
protocol::build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
|
||||
let Ok(res) =
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::process::{Child, Command};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use crate::metrics::{WAL_REDO_PROCESS_COUNTERS, WalRedoKillCause};
|
||||
use crate::metrics::WalRedoKillCause;
|
||||
|
||||
/// Wrapper type around `std::process::Child` which guarantees that the child
|
||||
/// will be killed and waited-for by this process before being dropped.
|
||||
@@ -47,9 +47,6 @@ impl NoLeakChild {
|
||||
|
||||
#[instrument(skip_all, fields(pid=child.id(), ?cause))]
|
||||
pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
|
||||
scopeguard::defer! {
|
||||
WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
|
||||
}
|
||||
let res = child.kill();
|
||||
if let Err(e) = res {
|
||||
// This branch is very unlikely because:
|
||||
|
||||
@@ -29,7 +29,7 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
|
||||
log.info("wait for all tenants to become active")
|
||||
wait_until_all_tenants_state(
|
||||
ps_http, "Active", iterations=10 + n_tenants, period=1, http_error_ok=False
|
||||
ps_http, "Active", iterations=10 + n_tenants, period=10, http_error_ok=False
|
||||
)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
@@ -40,7 +40,7 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
for layer in info.historic_layers:
|
||||
assert not layer.remote
|
||||
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=60)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=160)
|
||||
|
||||
log.info("ready")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user