From 8a8c656c0646895fbfc7a7bc5df4a93917071032 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 21 Jan 2025 22:18:09 +0100 Subject: [PATCH] pageserver: add `LayerMap::watch_layer0_deltas()` (#10470) ## Problem For compaction backpressure, we need a mechanism to signal when compaction has reduced the L0 delta layer count below the backpressure threshold. Extracted from #10405. ## Summary of changes Add `LayerMap::watch_level0_deltas()` which returns a `tokio::sync::watch::Receiver` signalling the current L0 delta layer count. --- pageserver/src/tenant/layer_map.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 1b6924425c..a69cce932e 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -57,6 +57,7 @@ use std::collections::{HashMap, VecDeque}; use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; +use tokio::sync::watch; use utils::lsn::Lsn; use historic_layer_coverage::BufferedHistoricLayerCoverage; @@ -67,7 +68,6 @@ use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc}; /// /// LayerMap tracks what layers exist on a timeline. /// -#[derive(Default)] pub struct LayerMap { // // 'open_layer' holds the current InMemoryLayer that is accepting new @@ -93,7 +93,25 @@ pub struct LayerMap { /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. + /// + /// NB: make sure to notify `watch_l0_deltas` on changes. l0_delta_layers: Vec>, + + /// Notifies about L0 delta layer changes, sending the current number of L0 layers. + watch_l0_deltas: watch::Sender, +} + +impl Default for LayerMap { + fn default() -> Self { + Self { + open_layer: Default::default(), + next_open_layer_at: Default::default(), + frozen_layers: Default::default(), + historic: Default::default(), + l0_delta_layers: Default::default(), + watch_l0_deltas: watch::channel(0).0, + } + } } /// The primary update API for the layer map. @@ -466,6 +484,8 @@ impl LayerMap { if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) { self.l0_delta_layers.push(layer_desc.clone().into()); + self.watch_l0_deltas + .send_replace(self.l0_delta_layers.len()); } self.historic.insert( @@ -488,6 +508,8 @@ impl LayerMap { let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); l0_delta_layers.retain(|other| other.key() != layer_key); self.l0_delta_layers = l0_delta_layers; + self.watch_l0_deltas + .send_replace(self.l0_delta_layers.len()); // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // there's a chance that the comparison fails at runtime due to it comparing (pointer, // vtable) pairs. @@ -850,6 +872,11 @@ impl LayerMap { &self.l0_delta_layers } + /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers. + pub fn watch_level0_deltas(&self) -> watch::Receiver { + self.watch_l0_deltas.subscribe() + } + /// debugging function to print out the contents of the layer map #[allow(unused)] pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {