mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
bring back spawn_blocking for compact_level0_phase1 (#4537)
The stats for `compact_level0_phase` that I added in #4527 show the following breakdown (24h data from prod, only looking at compactions with > 1 L1 produced): * 10%ish of wall-clock time spent between the two read locks * I learned that the `DeltaLayer::iter()` and `DeltaLayer::key_iter()` calls actually do IO, even before we call `.next()`. I suspect that is why they take so much time between the locks. * 80+% of wall-clock time spent writing layer files * Lock acquisition time is irrelevant (low double-digit microseconds at most) * The generation of the holes holds the read lock for a relatively long time and it's proportional to the amount of keys / IO required to iterate over them (max: 110ms in prod; staging (nightly benchmarks): multiple seconds). Find below screenshots from my ad-hoc spreadsheet + some graphs. <img width="1182" alt="image" src="https://github.com/neondatabase/neon/assets/956573/81398b3f-6fa1-40dd-9887-46a4715d9194"> <img width="901" alt="image" src="https://github.com/neondatabase/neon/assets/956573/e4ac0393-f2c1-4187-a5e5-39a8b0c394c9"> <img width="210" alt="image" src="https://github.com/neondatabase/neon/assets/956573/7977ade7-6aa5-4773-a0a2-f9729aecee0d"> ## Changes In This PR This PR makes the following changes: * rearrange the `compact_level0_phase1` code such that we build the `all_keys_iter` and `all_values_iter` later than before * only grab the `Timeline::layers` lock once, and hold it until we've computed the holes * run compact_level0_phase1 in spawn_blocking, pre-grabbing the `Timeline::layers` lock in the async code and passing it in as an `OwnedRwLockReadGuard`. * the code inside spawn_blocking drops this guard after computing the holds * the `OwnedRwLockReadGuard` requires the `Timeline::layers` to be wrapped in an `Arc`. I think that's Ok, the locking for the RwLock is more heavy-weight than an additional pointer indirection. ## Alternatives Considered The naive alternative is to throw the entire function into `spawn_blocking`, and use `blocking_read` for `Timeline::layers` access. What I've done in this PR is better because, with this alternative, 1. while we `blocking_read()`, we'd waste one slot in the spawn_blocking pool 2. there's deadlock risk because the spawn_blocking pool is a finite resource  ## Metadata Fixes https://github.com/neondatabase/neon/issues/4492
This commit is contained in:
committed by
GitHub
parent
c215389f1c
commit
44a441080d
@@ -129,7 +129,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -1418,7 +1418,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -3370,14 +3370,14 @@ struct CompactLevel0Phase1StatsBuilder {
|
||||
version: Option<u64>,
|
||||
tenant_id: Option<TenantId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
first_read_lock_acquisition_micros: DurationRecorder,
|
||||
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
|
||||
level0_deltas_count: Option<usize>,
|
||||
time_spent_between_locks: DurationRecorder,
|
||||
second_read_lock_acquisition_micros: DurationRecorder,
|
||||
second_read_lock_held_micros: DurationRecorder,
|
||||
sort_holes_micros: DurationRecorder,
|
||||
read_lock_acquisition_micros: DurationRecorder,
|
||||
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
|
||||
read_lock_held_prerequisites_micros: DurationRecorder,
|
||||
read_lock_held_compute_holes_micros: DurationRecorder,
|
||||
read_lock_drop_micros: DurationRecorder,
|
||||
prepare_iterators_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
level0_deltas_count: Option<usize>,
|
||||
new_deltas_count: Option<usize>,
|
||||
new_deltas_size: Option<u64>,
|
||||
}
|
||||
@@ -3390,14 +3390,14 @@ struct CompactLevel0Phase1Stats {
|
||||
tenant_id: TenantId,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
timeline_id: TimelineId,
|
||||
first_read_lock_acquisition_micros: RecordedDuration,
|
||||
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
|
||||
level0_deltas_count: usize,
|
||||
time_spent_between_locks: RecordedDuration,
|
||||
second_read_lock_acquisition_micros: RecordedDuration,
|
||||
second_read_lock_held_micros: RecordedDuration,
|
||||
sort_holes_micros: RecordedDuration,
|
||||
read_lock_acquisition_micros: RecordedDuration,
|
||||
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
|
||||
read_lock_held_prerequisites_micros: RecordedDuration,
|
||||
read_lock_held_compute_holes_micros: RecordedDuration,
|
||||
read_lock_drop_micros: RecordedDuration,
|
||||
prepare_iterators_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
level0_deltas_count: usize,
|
||||
new_deltas_count: usize,
|
||||
new_deltas_size: u64,
|
||||
}
|
||||
@@ -3406,54 +3406,51 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
|
||||
let CompactLevel0Phase1StatsBuilder {
|
||||
version,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
first_read_lock_acquisition_micros,
|
||||
get_level0_deltas_plus_drop_lock_micros,
|
||||
level0_deltas_count,
|
||||
time_spent_between_locks,
|
||||
second_read_lock_acquisition_micros,
|
||||
second_read_lock_held_micros,
|
||||
sort_holes_micros,
|
||||
write_layer_files_micros,
|
||||
new_deltas_count,
|
||||
new_deltas_size,
|
||||
} = value;
|
||||
Ok(CompactLevel0Phase1Stats {
|
||||
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
|
||||
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
|
||||
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
|
||||
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
|
||||
Ok(Self {
|
||||
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
|
||||
tenant_id: value
|
||||
.tenant_id
|
||||
.ok_or_else(|| anyhow!("tenant_id not set"))?,
|
||||
timeline_id: value
|
||||
.timeline_id
|
||||
.ok_or_else(|| anyhow!("timeline_id not set"))?,
|
||||
read_lock_acquisition_micros: value
|
||||
.read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
|
||||
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
|
||||
read_lock_held_spawn_blocking_startup_micros: value
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
|
||||
})?,
|
||||
level0_deltas_count: level0_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
|
||||
time_spent_between_locks: time_spent_between_locks
|
||||
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
|
||||
read_lock_held_prerequisites_micros: value
|
||||
.read_lock_held_prerequisites_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
|
||||
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
|
||||
read_lock_held_compute_holes_micros: value
|
||||
.read_lock_held_compute_holes_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
|
||||
second_read_lock_held_micros: second_read_lock_held_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
|
||||
read_lock_drop_micros: value
|
||||
.read_lock_drop_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
|
||||
sort_holes_micros: sort_holes_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
|
||||
prepare_iterators_micros: value
|
||||
.prepare_iterators_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
|
||||
write_layer_files_micros: write_layer_files_micros
|
||||
.ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?,
|
||||
write_layer_files_micros: value
|
||||
.write_layer_files_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
|
||||
new_deltas_count: new_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: new_deltas_size
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
|
||||
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
|
||||
level0_deltas_count: value
|
||||
.level0_deltas_count
|
||||
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
|
||||
new_deltas_count: value
|
||||
.new_deltas_count
|
||||
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: value
|
||||
.new_deltas_size
|
||||
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3464,30 +3461,18 @@ impl Timeline {
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
fn compact_level0_phase1(
|
||||
self: Arc<Self>,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(1),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let layers = self.layers.read().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.first_read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
stats.get_level0_deltas_plus_drop_lock_micros =
|
||||
stats.first_read_lock_acquisition_micros.till_now();
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
|
||||
@@ -3565,6 +3550,53 @@ impl Timeline {
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
stats.read_lock_held_prerequisites_micros = stats
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
stats.read_lock_held_compute_holes_micros =
|
||||
stats.read_lock_held_prerequisites_micros.till_now();
|
||||
drop(layers);
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter = itertools::process_results(
|
||||
@@ -3604,50 +3636,7 @@ impl Timeline {
|
||||
},
|
||||
)?;
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
|
||||
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
|
||||
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
drop(layers);
|
||||
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
|
||||
stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
@@ -3807,7 +3796,7 @@ impl Timeline {
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
|
||||
stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now();
|
||||
stats.new_deltas_count = Some(new_layers.len());
|
||||
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
|
||||
|
||||
@@ -3846,9 +3835,36 @@ impl Timeline {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
.await?;
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let myself = Arc::clone(self);
|
||||
let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(2),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
let layer_removal_cs = layer_removal_cs.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _entered = phase1_span.enter();
|
||||
myself.compact_level0_phase1(
|
||||
layer_removal_cs,
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
&ctx,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")??
|
||||
};
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
|
||||
Reference in New Issue
Block a user