mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
include delta l0 in compaction, more metrics
Signed-off-by: Alex Chi <chi@neon.tech>
This commit is contained in:
@@ -53,6 +53,15 @@ pub enum StorageTimeOperation {
|
||||
CreateTenant,
|
||||
}
|
||||
|
||||
pub static STORAGE_PHYSICAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_storage_physical_size_sum",
|
||||
"Physical size of different types of storage files",
|
||||
&["type", "tenant_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
"pageserver_storage_operations_seconds_sum",
|
||||
@@ -392,6 +401,8 @@ const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
pub const STORAGE_PHYSICAL_SIZE_FILE_TYPE: &[&str] = &["image", "delta", "partial-image"];
|
||||
|
||||
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_io_operations_seconds",
|
||||
@@ -884,6 +895,7 @@ impl Drop for TimelineMetrics {
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -906,6 +918,9 @@ impl Drop for TimelineMetrics {
|
||||
for op in SMGR_QUERY_TIME_OPERATIONS {
|
||||
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
for ty in STORAGE_PHYSICAL_SIZE_FILE_TYPE {
|
||||
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[ty, tenant_id, timeline_id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
|
||||
use super::Timeline;
|
||||
use crate::metrics::{STORAGE_PHYSICAL_SIZE, STORAGE_PHYSICAL_SIZE_FILE_TYPE};
|
||||
use crate::tenant::layer_map::{self, LayerMap};
|
||||
use anyhow::Result;
|
||||
use std::sync::{Mutex, Weak};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct LayerCache {
|
||||
/// Layer removal lock.
|
||||
@@ -21,6 +23,11 @@ pub struct LayerCache {
|
||||
#[allow(unused)]
|
||||
timeline: Weak<Timeline>,
|
||||
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub tenant_id_str: String,
|
||||
pub timeline_id_str: String,
|
||||
|
||||
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
@@ -33,11 +40,16 @@ pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new(timeline: Weak<Timeline>) -> Self {
|
||||
let timeline_arc = timeline.upgrade().unwrap();
|
||||
Self {
|
||||
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
|
||||
mapping: Mutex::new(HashMap::new()),
|
||||
timeline,
|
||||
tenant_id: timeline_arc.tenant_id,
|
||||
timeline_id: timeline_arc.timeline_id,
|
||||
tenant_id_str: timeline_arc.tenant_id.to_string(),
|
||||
timeline_id_str: timeline_arc.timeline_id.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,18 +79,21 @@ impl LayerCache {
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_sub(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
@@ -130,6 +145,7 @@ impl LayerCache {
|
||||
|
||||
/// Called within write path. When compaction and image layer creation we will create new layers.
|
||||
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
@@ -137,7 +153,38 @@ impl LayerCache {
|
||||
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
|
||||
/// Will move logic to delete files here later.
|
||||
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_sub(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
fn metrics_size_add(&self, layer: &dyn PersistentLayer) {
|
||||
STORAGE_PHYSICAL_SIZE
|
||||
.with_label_values(&[
|
||||
Self::get_layer_type(layer),
|
||||
&self.tenant_id_str,
|
||||
&self.timeline_id_str,
|
||||
])
|
||||
.add(layer.file_size() as i64);
|
||||
}
|
||||
|
||||
fn metrics_size_sub(&self, layer: &dyn PersistentLayer) {
|
||||
STORAGE_PHYSICAL_SIZE
|
||||
.with_label_values(&[
|
||||
Self::get_layer_type(layer),
|
||||
&self.tenant_id_str,
|
||||
&self.timeline_id_str,
|
||||
])
|
||||
.sub(layer.file_size() as i64);
|
||||
}
|
||||
|
||||
fn get_layer_type(layer: &dyn PersistentLayer) -> &'static str {
|
||||
if layer.is_delta() {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[1]
|
||||
} else if layer.is_incremental() {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[2]
|
||||
} else {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,11 +95,56 @@ pub struct LayerMap {
|
||||
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
|
||||
|
||||
/// All sorted runs. For tiered compaction.
|
||||
pub sorted_runs: Vec<(usize, Vec<Arc<PersistentLayerDesc>>)>,
|
||||
pub sorted_runs: SortedRuns,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SortedRuns {
|
||||
pub runs: Vec<(usize, Vec<Arc<PersistentLayerDesc>>)>,
|
||||
next_tier_id: usize,
|
||||
}
|
||||
|
||||
impl SortedRuns {
|
||||
/// Create a new sorted run and insert it at the top of the LSM tree.
|
||||
pub fn create_new_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
|
||||
let tier_id = self.next_tier_id();
|
||||
self.runs.insert(0, (tier_id, layers));
|
||||
tier_id
|
||||
}
|
||||
|
||||
/// Create a new sorted run and insert it at the bottom of the LSM tree.
|
||||
pub fn create_new_bottom_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
|
||||
let tier_id = self.next_tier_id();
|
||||
self.runs.push((tier_id, layers));
|
||||
tier_id
|
||||
}
|
||||
|
||||
pub fn compute_tier_sizes(&self) -> Vec<(usize, u64)> {
|
||||
self.runs
|
||||
.iter()
|
||||
.map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum()))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Remove a sorted run from the LSM tree.
|
||||
pub fn remove_run(&mut self, tier_id: usize) {
|
||||
self.runs.retain(|(id, _)| *id != tier_id);
|
||||
}
|
||||
|
||||
/// Remove layers and the corresponding sorted runs.
|
||||
pub fn insert_run_at(&mut self, idx: usize, layers: Vec<Arc<PersistentLayerDesc>>) {}
|
||||
|
||||
pub fn num_of_tiers(&self) -> usize {
|
||||
self.runs.len()
|
||||
}
|
||||
|
||||
pub fn next_tier_id(&mut self) -> usize {
|
||||
let ret = self.next_tier_id;
|
||||
self.next_tier_id += 1;
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
/// The primary update API for the layer map.
|
||||
///
|
||||
/// Batching historic layer insertions and removals is good for
|
||||
@@ -127,16 +172,10 @@ impl BatchedUpdates<'_> {
|
||||
}
|
||||
|
||||
/// Get a reference to the current sorted runs.
|
||||
pub fn sorted_runs(&mut self) -> &mut Vec<(usize, Vec<Arc<PersistentLayerDesc>>)> {
|
||||
pub fn sorted_runs(&mut self) -> &mut SortedRuns {
|
||||
&mut self.layer_map.sorted_runs
|
||||
}
|
||||
|
||||
pub fn next_tier_id(&mut self) -> usize {
|
||||
let ret = self.layer_map.next_tier_id;
|
||||
self.layer_map.next_tier_id += 1;
|
||||
ret
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
@@ -677,7 +716,7 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
println!("sorted_runs:");
|
||||
for (lvl, (tier_id, layer)) in self.sorted_runs.iter().enumerate() {
|
||||
for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() {
|
||||
println!("tier {}", tier_id);
|
||||
for layer in layer {
|
||||
layer.dump(verbose, ctx)?;
|
||||
|
||||
@@ -3081,6 +3081,9 @@ impl Timeline {
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic_new(l.layer_desc().clone());
|
||||
batch_updates
|
||||
.sorted_runs()
|
||||
.create_new_run(vec![l.layer_desc().clone().into()]);
|
||||
self.lcache.create_new_layer(l);
|
||||
batch_updates.flush();
|
||||
|
||||
@@ -3340,9 +3343,8 @@ impl Timeline {
|
||||
|
||||
// add this layer to the end of all sorted runs; this is only done when initializing with init_lsn
|
||||
// for now, and therefore the sorted runs are empty.
|
||||
assert!(updates.sorted_runs().is_empty());
|
||||
let tier_id = updates.next_tier_id();
|
||||
updates.sorted_runs().push((tier_id, sorted_run));
|
||||
assert_eq!(updates.sorted_runs().num_of_tiers(), 0);
|
||||
updates.sorted_runs().create_new_bottom_run(sorted_run);
|
||||
updates.flush();
|
||||
drop_wlock(guard);
|
||||
timer.stop_and_record();
|
||||
@@ -3740,98 +3742,8 @@ impl Timeline {
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
.await?;
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// If L0 does not need to be compacted, look into other layers
|
||||
return self
|
||||
.compact_tiered(layer_removal_cs, target_file_size, ctx)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Before deleting any layers, we need to wait for their upload ops to finish.
|
||||
// See storage_sync module level comment on consistency.
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, _) = &mut *guard;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
|
||||
let tier_id = updates.next_tier_id();
|
||||
updates.sorted_runs().insert(
|
||||
0,
|
||||
(
|
||||
tier_id,
|
||||
new_layers
|
||||
.iter()
|
||||
.map(|l| Arc::new(l.layer_desc().clone()))
|
||||
.collect(),
|
||||
),
|
||||
);
|
||||
|
||||
for l in new_layers {
|
||||
let new_delta_path = l.path();
|
||||
|
||||
let metadata = new_delta_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"read file metadata for new created layer {}",
|
||||
new_delta_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
&l.filename(),
|
||||
&LayerFileMetadata::new(metadata.len()),
|
||||
)?;
|
||||
}
|
||||
|
||||
// update the timeline's physical size
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic_new(x.layer_desc().clone());
|
||||
self.lcache.create_new_layer(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact {
|
||||
layer_names_to_delete.push(l.filename());
|
||||
self.delete_historic_layer_new(layer_removal_cs.clone(), l, &mut updates)?;
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
drop_wlock(guard);
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
self.compact_tiered(layer_removal_cs, target_file_size, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option<Vec<usize>> {
|
||||
@@ -3877,30 +3789,23 @@ impl Timeline {
|
||||
let (layers, _) = &*guard;
|
||||
|
||||
// Precondition: only compact if enough layers have accumulated.
|
||||
let threshold = 3;
|
||||
let threshold = 8;
|
||||
assert!(threshold >= 2);
|
||||
|
||||
info!("getting tiered compaction task");
|
||||
|
||||
layers.dump(false, ctx)?;
|
||||
|
||||
if layers.sorted_runs.len() < threshold {
|
||||
if layers.sorted_runs.num_of_tiers() < threshold {
|
||||
info!(
|
||||
level0_deltas = layers.sorted_runs.len(),
|
||||
level0_deltas = layers.sorted_runs.num_of_tiers(),
|
||||
threshold, "too few sorted runs to compact"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Gather the files to compact in this iteration.
|
||||
|
||||
let tier_sizes: Vec<(usize, u64)> = layers
|
||||
.sorted_runs
|
||||
.iter()
|
||||
.map(|(tier_id, layers)| {
|
||||
(*tier_id, layers.iter().map(|layer| layer.file_size()).sum())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let tier_sizes: Vec<(usize, u64)> = layers.sorted_runs.compute_tier_sizes();
|
||||
|
||||
let Some(tier_to_compact) = Self::get_compact_task(tier_sizes) else {
|
||||
return Ok(None);
|
||||
@@ -3913,7 +3818,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let mut deltas_to_compact_layers = vec![];
|
||||
for (tier_id, layers) in layers.sorted_runs.iter() {
|
||||
for (tier_id, layers) in layers.sorted_runs.runs.iter() {
|
||||
if tier_to_compact.contains(tier_id) {
|
||||
deltas_to_compact_layers.extend(layers.iter().cloned());
|
||||
}
|
||||
@@ -4197,7 +4102,7 @@ impl Timeline {
|
||||
let mut new_tier_at_index = None;
|
||||
let mut layers_to_delete = vec![];
|
||||
let mut layer_names_to_delete = vec![];
|
||||
for (tier_id, tier) in updates.sorted_runs() {
|
||||
for (tier_id, tier) in &updates.sorted_runs().runs {
|
||||
if *tier_id == new_tier_at {
|
||||
new_tier_at_index = Some(new_sorted_runs.len());
|
||||
}
|
||||
@@ -4252,9 +4157,9 @@ impl Timeline {
|
||||
self.lcache.create_new_layer(l);
|
||||
}
|
||||
|
||||
let new_tier_id = updates.next_tier_id();
|
||||
let new_tier_id = updates.sorted_runs().next_tier_id();
|
||||
new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs));
|
||||
*updates.sorted_runs() = new_sorted_runs;
|
||||
updates.sorted_runs().runs = new_sorted_runs;
|
||||
|
||||
updates.flush();
|
||||
drop_wlock(guard);
|
||||
|
||||
Reference in New Issue
Block a user