page_service: batching observability & include throttled time in smgr metrics (#9870)

This PR 

- fixes smgr metrics https://github.com/neondatabase/neon/issues/9925 
- adds an additional startup log line logging the current batching
config
- adds a histogram of batch sizes global and per-tenant
- adds a metric exposing the current batching config

The issue described #9925 is that before this PR, request latency was
only observed *after* batching.
This means that smgr latency metrics (most importantly getpage latency)
don't account for
- `wait_lsn` time 
- time spent waiting for batch to fill up / the executor stage to pick
up the batch.

The fix is to use a per-request batching timer, like we did before the
initial batching PR.
We funnel those timers through the entire request lifecycle.

I noticed that even before the initial batching changes, we weren't
accounting for the time spent writing & flushing the response to the
wire.
This PR drive-by fixes that deficiency by dropping the timers at the
very end of processing the batch, i.e., after the `pgb.flush()` call.

I was **unable to maintain the behavior that we deduct
time-spent-in-throttle from various latency metrics.
The reason is that we're using a *single* counter in `RequestContext` to
track micros spent in throttle.
But there are *N* metrics timers in the batch, one per request.
As a consequence, the practice of consuming the counter in the drop
handler of each timer no longer works because all but the first timer
will encounter error `close() called on closed state`.
A failed attempt to maintain the current behavior can be found in
https://github.com/neondatabase/neon/pull/9951.

So, this PR remvoes the deduction behavior from all metrics.
I started a discussion on Slack about it the implications this has for
our internal SLO calculation:
https://neondb.slack.com/archives/C033RQ5SPDH/p1732910861704029

# Refs

- fixes https://github.com/neondatabase/neon/issues/9925
- sub-issue https://github.com/neondatabase/neon/issues/9377
- epic: https://github.com/neondatabase/neon/issues/9376
This commit is contained in:
Christian Schwarz
2024-12-03 12:03:23 +01:00
committed by GitHub
parent 15d01b257a
commit cb10be710d
11 changed files with 373 additions and 396 deletions

View File

@@ -127,6 +127,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.
@@ -302,7 +303,7 @@ fn start_pageserver(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics();
pageserver::preinitialize_metrics(conf);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes

View File

@@ -91,8 +91,6 @@
use crate::task_mgr::TaskKind;
pub(crate) mod optional_counter;
// The main structure of this module, see module-level comment.
#[derive(Debug)]
pub struct RequestContext {
@@ -100,7 +98,6 @@ pub struct RequestContext {
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
}
/// The kind of access to the page cache.
@@ -158,7 +155,6 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
},
}
}
@@ -172,7 +168,6 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
},
}
}

View File

@@ -1,101 +0,0 @@
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
#[derive(Debug)]
pub struct CounterU32 {
inner: AtomicU32,
}
impl Default for CounterU32 {
fn default() -> Self {
Self {
inner: AtomicU32::new(u32::MAX),
}
}
}
impl CounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
match self
.inner
.compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => Ok(()),
Err(_) => Err("open() called on clsoed state"),
}
}
pub fn close(&self) -> Result<u32, &'static str> {
match self.inner.swap(u32::MAX, Ordering::Relaxed) {
u32::MAX => Err("close() called on closed state"),
x => Ok(x),
}
}
pub fn add(&self, count: u32) -> Result<(), &'static str> {
if count == 0 {
return Ok(());
}
let mut had_err = None;
self.inner
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur {
u32::MAX => {
had_err = Some("add() called on closed state");
None
}
x => {
let (new, overflowed) = x.overflowing_add(count);
if new == u32::MAX || overflowed {
had_err = Some("add() overflowed the counter");
None
} else {
Some(new)
}
}
})
.map_err(|_| had_err.expect("we set it whenever the function returns None"))
.map(|_| ())
}
}
#[derive(Default, Debug)]
pub struct MicroSecondsCounterU32 {
inner: CounterU32,
}
impl MicroSecondsCounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
self.inner.open()
}
pub fn add(&self, duration: Duration) -> Result<(), &'static str> {
match duration.as_micros().try_into() {
Ok(x) => self.inner.add(x),
Err(_) => Err("add(): duration conversion error"),
}
}
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
let val = self.inner.close()?;
let val = Duration::from_micros(val as u64);
let subbed = match from.checked_sub(val) {
Some(v) => v,
None => return Err("Duration::checked_sub"),
};
Ok(subbed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic() {
let counter = MicroSecondsCounterU32::default();
counter.open().unwrap();
counter.add(Duration::from_micros(23)).unwrap();
let res = counter
.close_and_checked_sub_from(Duration::from_micros(42))
.unwrap();
assert_eq!(res, Duration::from_micros(42 - 23));
}
}

View File

@@ -7,6 +7,10 @@ use metrics::{
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, QueryError};
use pq_proto::framed::ConnectionError;
@@ -1216,50 +1220,21 @@ pub(crate) mod virtual_file_io_engine {
});
}
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
global_latency_histo: &'a Histogram,
pub(crate) struct SmgrOpTimer {
global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<&'a Histogram>,
per_timeline_latency_histo: Option<Histogram>,
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
start: Instant,
}
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
for _ in 0..self.count {
self.global_latency_histo
.observe(ex_throttled.as_secs_f64());
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
}
let elapsed = self.start.elapsed().as_secs_f64();
self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}
}
}
@@ -1289,6 +1264,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
global_latency: [Histogram; SmgrQueryType::COUNT],
per_timeline_getpage_started: IntCounter,
per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram,
per_timeline_batch_size: Histogram,
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
@@ -1381,6 +1358,76 @@ static SMGR_QUERY_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL: Lazy<Vec<f64>> = Lazy::new(|| {
(1..=u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap())
.map(|v| v.into())
.collect()
});
static PAGE_SERVICE_BATCH_SIZE_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_service_batch_size_global",
"Batch size of pageserver page service requests",
PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL.clone(),
)
.expect("failed to define a metric")
});
static PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE: Lazy<Vec<f64>> = Lazy::new(|| {
let mut buckets = Vec::new();
for i in 0.. {
let bucket = 1 << i;
if bucket > u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap() {
break;
}
buckets.push(bucket.into());
}
buckets
});
static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_page_service_batch_size",
"Batch size of pageserver page service requests",
&["tenant_id", "shard_id", "timeline_id"],
PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE.clone()
)
.expect("failed to define a metric")
});
pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_page_service_config_max_batch_size",
"Configured maximum batch size for the server-side batching functionality of page_service. \
Labels expose more of the configuration parameters.",
&["mode", "execution"]
)
.expect("failed to define a metric")
});
fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE.reset();
let (label_values, value) = match conf {
PageServicePipeliningConfig::Serial => (["serial", "-"], 1),
PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined {
max_batch_size,
execution,
}) => {
let mode = "pipelined";
let execution = match execution {
PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
"concurrent-futures"
}
PageServiceProtocolPipelinedExecutionStrategy::Tasks => "tasks",
};
([mode, execution], max_batch_size.get())
}
};
PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE
.with_label_values(&label_values)
.set(value.try_into().unwrap());
}
impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
@@ -1416,78 +1463,51 @@ impl SmgrQueryTimePerTimeline {
])
.unwrap();
let global_batch_size = PAGE_SERVICE_BATCH_SIZE_GLOBAL.clone();
let per_timeline_batch_size = PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
Self {
global_started,
global_latency,
per_timeline_getpage_latency,
per_timeline_getpage_started,
global_batch_size,
per_timeline_batch_size,
}
}
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
let start = Instant::now();
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
self.global_started[op as usize].inc();
// We subtract time spent throttled from the observed latency.
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
self.per_timeline_getpage_started.inc();
Some(&self.per_timeline_getpage_latency)
Some(self.per_timeline_getpage_latency.clone())
} else {
None
};
Some(GlobalAndPerTimelineHistogramTimer {
global_latency_histo: &self.global_latency[op as usize],
SmgrOpTimer {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
ctx,
start,
op,
count,
})
start: started_at,
}
}
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
self.global_batch_size.observe(batch_size as f64);
self.per_timeline_batch_size.observe(batch_size as f64);
}
}
#[cfg(test)]
mod smgr_query_time_tests {
use std::time::Instant;
use pageserver_api::shard::TenantShardId;
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
};
// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
@@ -1531,8 +1551,7 @@ mod smgr_query_time_tests {
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer(*op, &ctx);
let timer = metrics.start_smgr_op(*op, Instant::now());
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();
@@ -1579,58 +1598,24 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
}
});
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> {
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a> {
parent: &'a BasebackupQueryTime,
ctx: &'c RequestContext,
start: std::time::Instant,
}
impl BasebackupQueryTime {
pub(crate) fn start_recording<'c: 'a, 'a>(
&'a self,
ctx: &'c RequestContext,
) -> BasebackupQueryTimeOngoingRecording<'a, 'a> {
pub(crate) fn start_recording(&self) -> BasebackupQueryTimeOngoingRecording<'_> {
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
BasebackupQueryTimeOngoingRecording {
parent: self,
ctx,
start,
}
}
}
impl BasebackupQueryTimeOngoingRecording<'_, '_> {
impl BasebackupQueryTimeOngoingRecording<'_> {
pub(crate) fn observe<T>(self, res: &Result<T, QueryError>) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(ex_throttled) => ex_throttled,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
let elapsed = self.start.elapsed().as_secs_f64();
// If you want to change categorize of a specific error, also change it in `log_query_error`.
let metric = match res {
Ok(_) => &self.parent.ok,
@@ -1641,7 +1626,7 @@ impl BasebackupQueryTimeOngoingRecording<'_, '_> {
}
Err(_) => &self.parent.error,
};
metric.observe(ex_throttled.as_secs_f64());
metric.observe(elapsed);
}
}
@@ -2722,6 +2707,11 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}
@@ -2747,10 +2737,12 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::Timeline;
/// Maintain a per timeline gauge in addition to the global gauge.
pub(crate) struct PerTimelineRemotePhysicalSizeGauge {
@@ -3562,7 +3554,9 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
pub fn preinitialize_metrics() {
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
// Python tests need these and on some we do alerting.
//
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
@@ -3630,6 +3624,7 @@ pub fn preinitialize_metrics() {
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
&PAGE_SERVICE_BATCH_SIZE_GLOBAL,
]
.into_iter()
.for_each(|h| {

View File

@@ -51,7 +51,7 @@ use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self};
use crate::metrics::{self, SmgrOpTimer};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -540,11 +540,13 @@ impl From<WaitLsnError> for QueryError {
enum BatchedFeMessage {
Exists {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
@@ -552,15 +554,17 @@ enum BatchedFeMessage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>,
},
DbSize {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
@@ -632,6 +636,8 @@ impl PageServerHandler {
msg = pgb.read_message() => { msg }
};
let received_at = Instant::now();
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
@@ -660,7 +666,15 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Exists { span, shard, req }
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetRelExists, received_at);
BatchedFeMessage::Exists {
span,
timer,
shard,
req,
}
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
@@ -668,7 +682,15 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Nblocks { span, shard, req }
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetRelSize, received_at);
BatchedFeMessage::Nblocks {
span,
timer,
shard,
req,
}
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
@@ -676,7 +698,15 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::DbSize { span, shard, req }
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetDbSize, received_at);
BatchedFeMessage::DbSize {
span,
timer,
shard,
req,
}
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
@@ -684,7 +714,15 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::GetSlruSegment { span, shard, req }
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetSlruSegment, received_at);
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard,
req,
}
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
@@ -728,6 +766,14 @@ impl PageServerHandler {
return respond_error!(e.into());
}
};
// It's important to start the timer before waiting for the LSN
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for LSNs.
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetPageAtLsn, received_at);
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
@@ -747,7 +793,7 @@ impl PageServerHandler {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
pages: smallvec::smallvec![(rel, blkno, timer)],
}
}
};
@@ -832,88 +878,112 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// invoke handler function
let (handler_results, span): (Vec<Result<PagestreamBeMessage, PageStreamError>>, _) =
match batch {
BatchedFeMessage::Exists { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
self.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::Nblocks { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
self.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetPage {
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>>,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![self
.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
shard,
effective_request_lsn,
pages,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::DbSize { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![
self.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetSlruSegment { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![
self.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
}
};
)
}
BatchedFeMessage::Nblocks {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![self
.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
)
}
BatchedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::DbSize {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![self
.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
)
}
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![self
.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
@@ -944,7 +1014,12 @@ impl PageServerHandler {
})
}
},
Ok(response_msg) => response_msg,
Ok((response_msg, timer)) => {
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
};
// marshal & transmit response message
@@ -961,6 +1036,7 @@ impl PageServerHandler {
res?;
}
}
drop(timers);
Ok(())
}
@@ -1423,10 +1499,6 @@ impl PageServerHandler {
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1453,10 +1525,6 @@ impl PageServerHandler {
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1483,10 +1551,6 @@ impl PageServerHandler {
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1512,26 +1576,41 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>,
ctx: &RequestContext,
) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
);
let pages = timeline
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
timeline
.query_metrics
.observe_getpage_batch_start(requests.len());
let results = timeline
.get_rel_page_at_lsn_batched(
requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)),
effective_lsn,
ctx,
)
.await;
assert_eq!(results.len(), requests.len());
Vec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::from)
}))
// TODO: avoid creating the new Vec here
Vec::from_iter(
requests
.into_iter()
.zip(results.into_iter())
.map(|((_, _, timer), res)| {
res.map(|page| {
(
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
page,
}),
timer,
)
})
.map_err(PageStreamError::from)
}),
)
}
#[instrument(skip_all, fields(shard_id))]
@@ -1541,10 +1620,6 @@ impl PageServerHandler {
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2045,7 +2120,7 @@ where
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::Basebackup)
.inc();
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
let res = async {
self.handle_basebackup_request(
pgb,

View File

@@ -203,9 +203,13 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
let pages = smallvec::smallvec![(tag, blknum)];
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
effective_lsn,
ctx,
)
.await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
@@ -240,7 +244,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
effective_lsn: Lsn,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -254,7 +258,7 @@ impl Timeline {
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() {
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -265,7 +269,7 @@ impl Timeline {
}
let nblocks = match self
.get_rel_size(tag, Version::Lsn(effective_lsn), ctx)
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
.await
{
Ok(nblocks) => nblocks,
@@ -276,7 +280,7 @@ impl Timeline {
}
};
if blknum >= nblocks {
if *blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, effective_lsn, nblocks
@@ -286,7 +290,7 @@ impl Timeline {
continue;
}
let key = rel_block_to_key(tag, blknum);
let key = rel_block_to_key(*tag, *blknum);
let key_slots = keys_slots.entry(key).or_default();
key_slots.push(response_slot_idx);

View File

@@ -2,14 +2,14 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
Arc,
},
time::{Duration, Instant},
};
use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::{error, warn};
use tracing::error;
use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
use crate::{context::RequestContext, task_mgr::TaskKind};
@@ -162,19 +162,6 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.call(move || {
warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
});
}
}
Some(wait_time)
} else {
None

View File

@@ -1059,7 +1059,8 @@ impl Timeline {
.map(|metric| (metric, Instant::now()));
// start counting after throttle so that throttle time
// is always less than observation time
// is always less than observation time and we don't
// underflow when computing `ex_throttled` below.
let throttled = self
.timeline_get_throttle
.throttle(ctx, key_count as usize)
@@ -1138,7 +1139,9 @@ impl Timeline {
.map(ScanLatencyOngoingRecording::start_recording);
// start counting after throttle so that throttle time
// is always less than observation time
// is always less than observation time and we don't
// underflow when computing the `ex_throttled` value in
// `recording.observe(throttled)` below.
let throttled = self
.timeline_get_throttle
// assume scan = 1 quota for now until we find a better way to process this

View File

@@ -173,6 +173,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
*histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
# "pageserver_broken_tenants_count" -- used only for broken

View File

@@ -167,18 +167,18 @@ def test_throughput(
@dataclass
class Metrics:
time: float
pageserver_getpage_count: float
pageserver_vectored_get_count: float
pageserver_batch_size_histo_sum: float
pageserver_batch_size_histo_count: float
compute_getpage_count: float
pageserver_cpu_seconds_total: float
def __sub__(self, other: "Metrics") -> "Metrics":
return Metrics(
time=self.time - other.time,
pageserver_getpage_count=self.pageserver_getpage_count
- other.pageserver_getpage_count,
pageserver_vectored_get_count=self.pageserver_vectored_get_count
- other.pageserver_vectored_get_count,
pageserver_batch_size_histo_sum=self.pageserver_batch_size_histo_sum
- other.pageserver_batch_size_histo_sum,
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count
- other.pageserver_batch_size_histo_count,
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
- other.pageserver_cpu_seconds_total,
@@ -187,8 +187,8 @@ def test_throughput(
def normalize(self, by) -> "Metrics":
return Metrics(
time=self.time / by,
pageserver_getpage_count=self.pageserver_getpage_count / by,
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
pageserver_batch_size_histo_sum=self.pageserver_batch_size_histo_sum / by,
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by,
compute_getpage_count=self.compute_getpage_count / by,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
)
@@ -202,11 +202,11 @@ def test_throughput(
pageserver_metrics = ps_http.get_metrics()
return Metrics(
time=time.time(),
pageserver_getpage_count=pageserver_metrics.query_one(
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
pageserver_batch_size_histo_sum=pageserver_metrics.query_one(
"pageserver_page_service_batch_size_sum"
).value,
pageserver_vectored_get_count=pageserver_metrics.query_one(
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
pageserver_batch_size_histo_count=pageserver_metrics.query_one(
"pageserver_page_service_batch_size_count"
).value,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
@@ -243,7 +243,7 @@ def test_throughput(
# Sanity-checks on the collected data
#
# assert that getpage counts roughly match between compute and ps
assert metrics.pageserver_getpage_count == pytest.approx(
assert metrics.pageserver_batch_size_histo_sum == pytest.approx(
metrics.compute_getpage_count, rel=0.01
)
@@ -256,7 +256,7 @@ def test_throughput(
zenbenchmark.record(
"perfmetric.batching_factor",
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
metrics.pageserver_batch_size_histo_sum / metrics.pageserver_batch_size_histo_count,
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)

View File

@@ -4,6 +4,7 @@ import copy
import json
import uuid
import pytest
from anyio import Path
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
@@ -70,14 +71,21 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
log.info("warmup / make sure metrics are present")
run_pagebench_at_max_speed_and_get_total_requests_completed(2)
metrics_query = {
smgr_metrics_query = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"smgr_query_type": "get_page_at_lsn",
}
metric_name = "pageserver_smgr_query_seconds_sum"
smgr_query_seconds_pre = ps_http.get_metric_value(metric_name, metrics_query)
smgr_metric_name = "pageserver_smgr_query_seconds_sum"
throttle_metrics_query = {
"tenant_id": str(tenant_id),
}
throttle_metric_name = "pageserver_tenant_throttling_wait_usecs_sum_total"
smgr_query_seconds_pre = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_pre is not None
throttled_usecs_pre = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
assert throttled_usecs_pre is not None
marker = uuid.uuid4().hex
ps_http.post_tracing_event("info", marker)
@@ -108,14 +116,23 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
timeout=compaction_period,
)
log.info("validate that the metric doesn't include throttle wait time")
smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query)
log.info("the smgr metric includes throttle time")
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_post is not None
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
assert throttled_usecs_post is not None
actual_smgr_query_seconds = smgr_query_seconds_post - smgr_query_seconds_pre
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
actual_throttled_secs = actual_throttled_usecs / 1_000_000
assert (
duration_secs >= 10 * actual_smgr_query_seconds
), "smgr metrics should not include throttle wait time"
pytest.approx(duration_secs, 0.1) == actual_smgr_query_seconds
), "smgr metrics include throttle wait time"
smgr_ex_throttle = actual_smgr_query_seconds - actual_throttled_secs
assert smgr_ex_throttle > 0
assert (
duration_secs > 10 * smgr_ex_throttle
), "most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
throttle_config_with_field_fair_set = {