mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 23:20:40 +00:00
feat(pageserver): preempt image creation across timelines
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -283,6 +283,52 @@ pub(crate) enum SpawnMode {
|
||||
Lazy,
|
||||
}
|
||||
|
||||
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
|
||||
///
|
||||
/// It is used to notify the compaction loop that there is work to be done. It is also used
|
||||
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
|
||||
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
|
||||
/// on other timelines.
|
||||
pub struct CompactionNotifier {
|
||||
notify: Notify,
|
||||
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
|
||||
}
|
||||
|
||||
impl CompactionNotifier {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
notify: Notify::new(),
|
||||
l0_count: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify_one(&self) {
|
||||
self.notify.notify_one();
|
||||
}
|
||||
|
||||
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
|
||||
self.notify.notified()
|
||||
}
|
||||
|
||||
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.insert(timeline_id, l0_count);
|
||||
}
|
||||
|
||||
pub fn on_shutdown(&self, timeline_id: TimelineId) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.remove(&timeline_id);
|
||||
}
|
||||
|
||||
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
|
||||
let guard = self.l0_count.lock().unwrap();
|
||||
guard
|
||||
.iter()
|
||||
.max_by_key(|(_, count)| *count)
|
||||
.map(|(timeline_id, count)| (*count, *timeline_id))
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -358,7 +404,7 @@ pub struct Tenant {
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
|
||||
pub(crate) l0_compaction_trigger: Arc<Notify>,
|
||||
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
@@ -4242,7 +4288,7 @@ impl Tenant {
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
l0_compaction_trigger: Arc::new(Notify::new()),
|
||||
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
|
||||
scheduled_compaction_tasks: Mutex::new(Default::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
|
||||
|
||||
@@ -47,7 +47,7 @@ use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{oneshot, watch, Notify};
|
||||
use tokio::sync::{oneshot, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::critical;
|
||||
@@ -151,7 +151,8 @@ use super::{
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
|
||||
HeatMapTimeline,
|
||||
};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
@@ -224,7 +225,7 @@ pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -428,7 +429,7 @@ pub struct Timeline {
|
||||
compaction_failed: AtomicBool,
|
||||
|
||||
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
|
||||
l0_compaction_trigger: Arc<Notify>,
|
||||
l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
|
||||
/// Make sure we only have one running gc at a time.
|
||||
///
|
||||
@@ -1965,6 +1966,8 @@ impl Timeline {
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
|
||||
|
||||
if let ShutdownMode::FreezeAndFlush = mode {
|
||||
let do_flush = if let Some((open, frozen)) = self
|
||||
.layers
|
||||
@@ -4119,6 +4122,8 @@ impl Timeline {
|
||||
if l0_count >= self.get_compaction_threshold() {
|
||||
self.l0_compaction_trigger.notify_one();
|
||||
}
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
|
||||
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
|
||||
@@ -5067,20 +5072,22 @@ impl Timeline {
|
||||
// image layer generation taking too long time and blocking L0 compaction. So in this
|
||||
// mode, we also inspect the current number of L0 layers and skip image layer generation
|
||||
// if there are too many of them.
|
||||
let num_of_l0_layers = {
|
||||
let layers = self.layers.read().await;
|
||||
layers.layer_map()?.level0_deltas().len()
|
||||
};
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
if let Some((max_num_of_l0_layers, timeline_id)) =
|
||||
self.l0_compaction_trigger.get_max_l0_count()
|
||||
{
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0
|
||||
&& max_num_of_l0_layers >= image_preempt_threshold
|
||||
{
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5459,8 +5466,13 @@ impl Timeline {
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&remove_layers, new_deltas)?;
|
||||
|
||||
let l0_count = guard.layer_map()?.level0_deltas().len();
|
||||
|
||||
drop_wlock(guard);
|
||||
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user