Compare commits

..

8 Commits

Author SHA1 Message Date
Christian Schwarz
8522f429eb port forward dd722fdaf6 (flush recording arcs outside SmgrOpTimerInner) 2025-02-07 11:49:10 +01:00
Christian Schwarz
6e1cbce715 Revert "experiment: clone the metrics but don't measure or update the metrics"
This reverts commit 949fbc15a0.
2025-02-07 11:28:14 +01:00
Christian Schwarz
949fbc15a0 experiment: clone the metrics but don't measure or update the metrics 2025-02-07 11:13:59 +01:00
Christian Schwarz
98dd19ef53 Revert "what if we Box the SmgrOpTimerInner"
This reverts commit a0e8b1617b.
2025-02-07 11:08:47 +01:00
Christian Schwarz
a0e8b1617b what if we Box the SmgrOpTimerInner 2025-02-07 11:02:30 +01:00
Christian Schwarz
1686d9e733 perf(page_service): dont .instrument(span.clone()) the response flush (#10686)
On my AX102 Hetzner box, removing this line removes about 20us from the
`latency_mean` result in

`test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant`.

If the same 20us can be removed in the nightly benchmark run, this will
be a ~10% improvement because there, mean latencies are about ~220us.

This span was added during batching refactors, we didn't have it before,
and I don't think it's terribly useful.

refs
- https://github.com/neondatabase/cloud/issues/21759
2025-02-06 08:33:37 +00:00
Erik Grinaker
abcd00181c pageserver: set a concurrency limit for LocalFS (#10676)
## Problem

The local filesystem backend for remote storage doesn't set a
concurrency limit. While it can't/won't enforce a concurrency limit
itself, this also bounds the upload queue concurrency. Some tests create
thousands of uploads, which slows down the quadratic scheduling of the
upload queue, and there is no point spawning that many Tokio tasks.

Resolves #10409.

## Summary of changes

Set a concurrency limit of 100 for the LocalFS backend.

Before: `test_layer_map[release-pg17].test_query: 68.338 s`
After: `test_layer_map[release-pg17].test_query: 5.209 s`
2025-02-06 07:24:36 +00:00
Konstantin Knizhnik
01f0be03b5 Fix bugs in lfc_cache_containsv (#10682)
## Problem

Incorrect manipulations with iteration index in `lfc_cache_containsv`

## Summary of changes

```
-		int		this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
+		int		this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);		int		this_chunk = ```
 -		if (i + 1 >= nblocks)
+		if (i >= nblocks)
```

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-06 07:00:00 +00:00
7 changed files with 114 additions and 127 deletions

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> Option<usize> {
pub fn concurrency_limit(&self) -> usize {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
}

View File

@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// Here, a limit of max 20k concurrent connections was noted.
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
/// Set this limit analogously to the S3 limit.
///
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

View File

@@ -1253,9 +1253,6 @@ pub(crate) struct SmgrOpTimerInner {
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
throttling: Arc<tenant_throttling::Pagestream>,
timings: SmgrOpTimerState,
@@ -1366,20 +1363,18 @@ impl SmgrOpTimer {
/// The first callers receives Some, subsequent ones None.
///
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_end_flush_start(
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
pub(crate) fn observe_execution_end_flush_start(&mut self, at: Instant) {
// NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
return None;
// NB: this take() isn't needed anymore, maybe we can simplify
return;
};
let SmgrOpTimerState::Executing {
execution_started_at,
} = &inner.timings
else {
return None;
return;
};
// update metrics
let execution = at - *execution_started_at;
@@ -1394,36 +1389,9 @@ impl SmgrOpTimer {
// state transition
inner.timings = SmgrOpTimerState::Flushing;
// return the flush in progress object which
// will do the remaining metrics updates
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
Some(SmgrOpFlushInProgress {
flush_started_at: at,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
})
}
}
/// The last stage of request processing is serializing and flushing the request
/// into the TCP connection. We want to make slow flushes observable
/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`]
/// to periodically bump the metric.
///
/// If in the future we decide that we're not interested in live updates, we can
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
/// and remove this struct from the code base.
pub(crate) struct SmgrOpFlushInProgress {
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
// In case of early drop, update any of the remaining metrics with
@@ -1442,42 +1410,6 @@ impl Drop for SmgrOpTimer {
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.flush_started_at = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
#[derive(
Debug,
Clone,
@@ -1513,6 +1445,56 @@ pub(crate) struct SmgrQueryTimePerTimeline {
throttling: Arc<tenant_throttling::Pagestream>,
}
impl SmgrQueryTimePerTimeline {
pub(crate) async fn record_flush_in_progress<Fut, O>(
shard: &crate::tenant::timeline::handle::WeakHandle<
crate::page_service::TenantManagerTypes,
>,
start_at: Instant,
mut fut: Fut,
) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut base = start_at;
let mut observe_guard = scopeguard::guard(
|| {
let Ok(upgraded) = shard.upgrade() else {
return;
};
let now = Instant::now();
let elapsed = now - base;
upgraded
.query_metrics
.global_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
upgraded
.query_metrics
.per_timeline_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
@@ -1797,10 +1779,6 @@ impl SmgrQueryTimePerTimeline {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_execution_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_execution_latency_histo: per_timeline_latency_histo,
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
global_batch_wait_time: self.global_batch_wait_time.clone(),
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
throttling: self.throttling.clone(),

View File

@@ -1063,9 +1063,10 @@ impl PageServerHandler {
};
// invoke handler function
let (handler_results, span): (
let (handler_results, span, shard): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
_,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
@@ -1082,6 +1083,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::Nblocks {
@@ -1099,6 +1101,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetPage {
@@ -1126,6 +1129,7 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::DbSize {
@@ -1143,6 +1147,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetSlruSegment {
@@ -1160,6 +1165,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
#[cfg(feature = "testing")]
@@ -1181,12 +1187,13 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
(vec![Err(error)], span, None)
}
};
@@ -1194,7 +1201,7 @@ impl PageServerHandler {
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for handler_result in handler_results {
let (response_msg, timer) = match handler_result {
let (response_msg, mut timer) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1250,20 +1257,24 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
let start_flushing_at = Instant::now();
if let Some(timer) = &mut timer {
timer.observe_execution_end_flush_start(start_flushing_at);
}
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
let flush_fut = if let Some(shard) = &shard {
// don't hold upgraded handle while flushing!
futures::future::Either::Left(
metrics::SmgrQueryTimePerTimeline::record_flush_in_progress(
shard,
start_flushing_at,
flush_fut,
),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {
@@ -1280,8 +1291,6 @@ impl PageServerHandler {
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
Ok(())

View File

@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;

View File

@@ -326,7 +326,7 @@ pub struct Timeline {
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
pub(crate) query_metrics: Arc<crate::metrics::SmgrQueryTimePerTimeline>,
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
@@ -2517,11 +2517,11 @@ impl Timeline {
metrics,
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
query_metrics: Arc::new(crate::metrics::SmgrQueryTimePerTimeline::new(
&tenant_shard_id,
&timeline_id,
resources.pagestream_throttle_metrics,
),
)),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),

View File

@@ -509,47 +509,44 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return 0;
}
while (true)
{
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
{
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
{
BITMAP_SET(bitmap, i);
found++;
}
BITMAP_SET(bitmap, i);
found++;
}
}
else
{
i += this_chunk;
}
}
else
{
LWLockRelease(lfc_lock);
return found;
i += this_chunk;
}
/*
* Break out of the iteration before doing expensive stuff for
* a next iteration
*/
if (i + 1 >= nblocks)
if (i >= nblocks)
break;
/*