Compare commits

...

2 Commits

Author SHA1 Message Date
Alex Chi Z
f84c9f3d29 fix clippy
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-13 21:44:06 +01:00
Alex Chi Z
0937c51301 feat(pageserver): preempt image creation across timelines
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-13 21:14:07 +01:00
2 changed files with 79 additions and 20 deletions

View File

@@ -283,6 +283,53 @@ pub(crate) enum SpawnMode {
Lazy,
}
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
///
/// It is used to notify the compaction loop that there is work to be done. It is also used
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
/// on other timelines.
pub struct CompactionNotifier {
notify: Notify,
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
}
impl CompactionNotifier {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
notify: Notify::new(),
l0_count: std::sync::Mutex::new(HashMap::new()),
}
}
pub fn notify_one(&self) {
self.notify.notify_one();
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
let mut guard = self.l0_count.lock().unwrap();
guard.insert(timeline_id, l0_count);
}
pub fn on_shutdown(&self, timeline_id: TimelineId) {
let mut guard = self.l0_count.lock().unwrap();
guard.remove(&timeline_id);
}
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
let guard = self.l0_count.lock().unwrap();
guard
.iter()
.max_by_key(|(_, count)| *count)
.map(|(timeline_id, count)| (*count, *timeline_id))
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -358,7 +405,7 @@ pub struct Tenant {
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
pub(crate) l0_compaction_trigger: Arc<Notify>,
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
@@ -4242,7 +4289,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
l0_compaction_trigger: Arc::new(Notify::new()),
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),

View File

@@ -47,7 +47,7 @@ use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, watch, Notify};
use tokio::sync::{oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::critical;
@@ -151,7 +151,8 @@ use super::{
MaybeOffloaded,
};
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
HeatMapTimeline,
};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -224,7 +225,7 @@ pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_compaction_trigger: Arc<CompactionNotifier>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -428,7 +429,7 @@ pub struct Timeline {
compaction_failed: AtomicBool,
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
l0_compaction_trigger: Arc<Notify>,
l0_compaction_trigger: Arc<CompactionNotifier>,
/// Make sure we only have one running gc at a time.
///
@@ -1965,6 +1966,8 @@ impl Timeline {
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
@@ -4119,6 +4122,8 @@ impl Timeline {
if l0_count >= self.get_compaction_threshold() {
self.l0_compaction_trigger.notify_one();
}
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
@@ -5067,20 +5072,22 @@ impl Timeline {
// image layer generation taking too long time and blocking L0 compaction. So in this
// mode, we also inspect the current number of L0 layers and skip image layer generation
// if there are too many of them.
let num_of_l0_layers = {
let layers = self.layers.read().await;
layers.layer_map()?.level0_deltas().len()
};
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
if let Some((max_num_of_l0_layers, timeline_id)) =
self.l0_compaction_trigger.get_max_l0_count()
{
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0
&& max_num_of_l0_layers >= image_preempt_threshold
{
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
}
}
}
@@ -5459,8 +5466,13 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
let l0_count = guard.layer_map()?.level0_deltas().len();
drop_wlock(guard);
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
Ok(())
}