mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
@@ -14,6 +14,50 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
pub(crate) enum RateLimitError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit(
|
||||
_ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<impl Drop, RateLimitError> {
|
||||
// TODO: use request context TaskKind to get statistics on how many tasks of what kind are waiting for background task semaphore
|
||||
tokio::select! {
|
||||
permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
|
||||
match permit {
|
||||
Ok(permit) => Ok(permit),
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(RateLimitError::Cancelled)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start per tenant background loops: compaction and gc.
|
||||
pub fn start_background_loops(
|
||||
tenant: &Arc<Tenant>,
|
||||
|
||||
@@ -44,6 +44,7 @@ use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::tasks::RateLimitError;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -158,7 +159,7 @@ pub struct Timeline {
|
||||
|
||||
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
|
||||
/// Never changes for the lifetime of this [`Timeline`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -684,38 +685,13 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
// once_cell::sync::Lazy::new(|| {
|
||||
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
// let permits = usize::max(
|
||||
// 1,
|
||||
// // while a lot of the work is done on spawn_blocking, we still do
|
||||
// // repartitioning in the async context. this should give leave us some workers
|
||||
// // unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// // effects of restarts.
|
||||
// //
|
||||
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// // spawn_blocking.
|
||||
// (total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
// );
|
||||
// assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
// assert!(
|
||||
// permits < total_threads,
|
||||
// "need threads avail for shorter work"
|
||||
// );
|
||||
// tokio::sync::Semaphore::new(permits)
|
||||
// });
|
||||
|
||||
// // this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// // compaction task goes over it's period (20s) which is quite often in production.
|
||||
// let _permit = tokio::select! {
|
||||
// permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
// permit
|
||||
// },
|
||||
// _ = cancel.cancelled() => {
|
||||
// return Ok(());
|
||||
// }
|
||||
// };
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(ctx, cancel).await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return Ok(()),
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::PersistentLayer,
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
LogicalSizeCalculationCause, Tenant, tasks::RateLimitError,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -150,6 +150,11 @@ impl Timeline {
|
||||
) -> ControlFlow<()> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(ctx, cancel).await {
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
// If we evict layers but keep cached values derived from those layers, then
|
||||
// we face a storm of on-demand downloads after pageserver restart.
|
||||
// The reason is that the restart empties the caches, and so, the values
|
||||
|
||||
Reference in New Issue
Block a user