parallel compaction

Signed-off-by: Alex Chi <chi@neon.tech>
This commit is contained in:
Alex Chi
2023-06-27 16:42:56 -04:00
parent eb93e686ab
commit f3fdaf8ef1
4 changed files with 103 additions and 58 deletions

View File

@@ -1560,7 +1560,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.lcache.delete_guard().await;
let layer_removal_guard = timeline.lcache.delete_guard_write().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled

View File

@@ -13,7 +13,7 @@ pub struct LayerCache {
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
pub layers_removal_lock: Arc<tokio::sync::RwLock<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
@@ -36,13 +36,16 @@ pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
pub struct DeleteGuardRead(Arc<tokio::sync::OwnedRwLockReadGuard<()>>);
#[derive(Clone)]
pub struct DeleteGuardWrite(Arc<tokio::sync::OwnedRwLockWriteGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>, tenant_id: TenantId, timeline_id: TimelineId) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
layers_removal_lock: Arc::new(tokio::sync::RwLock::new(())),
mapping: Mutex::new(HashMap::new()),
timeline: timeline,
tenant_id: tenant_id,
@@ -70,9 +73,16 @@ impl LayerCache {
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard(&self) -> DeleteGuard {
DeleteGuard(Arc::new(
self.layers_removal_lock.clone().lock_owned().await,
pub async fn delete_guard_read(&self) -> DeleteGuardRead {
DeleteGuardRead(Arc::new(
self.layers_removal_lock.clone().read_owned().await,
))
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard_write(&self) -> DeleteGuardWrite {
DeleteGuardWrite(Arc::new(
self.layers_removal_lock.clone().write_owned().await,
))
}

View File

@@ -14,35 +14,41 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
use super::timeline::ENABLE_TIERED_COMPACTION;
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_id),
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
// start two compaction threads
let range = if ENABLE_TIERED_COMPACTION { 0..2 } else { 0..1 };
for _ in range {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_id),
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,

View File

@@ -22,7 +22,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
@@ -79,7 +79,7 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::{DeleteGuard, LayerCache};
use super::layer_cache::{DeleteGuardRead, LayerCache, DeleteGuardWrite};
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -87,7 +87,7 @@ use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
};
const ENABLE_TIERED_COMPACTION: bool = false;
pub const ENABLE_TIERED_COMPACTION: bool = false;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -155,6 +155,8 @@ pub struct Timeline {
pub(super) lcache: LayerCache,
compacting: Mutex<HashSet<usize>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
@@ -840,7 +842,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = self.lcache.delete_guard().await;
let layer_removal_cs = self.lcache.delete_guard_read().await;
// Is the timeline being deleted?
if self.is_stopping() {
return Err(anyhow::anyhow!("timeline is Stopping").into());
@@ -875,7 +877,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
self.compact_level0(layer_removal_cs, target_file_size, ctx)
.await?;
timer.stop_and_record();
}
@@ -1174,7 +1176,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
// now lock out layer removal (compaction, gc, timeline deletion)
let layer_removal_guard = self.lcache.delete_guard().await;
let layer_removal_guard = self.lcache.delete_guard_write().await;
{
// to avoid racing with detach and delete_timeline
@@ -1212,7 +1214,7 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &DeleteGuard,
_layer_removal_cs: &DeleteGuardWrite,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<bool> {
@@ -1439,6 +1441,7 @@ impl Timeline {
layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())),
lcache: LayerCache::new(myself.clone(), tenant_id, timeline_id),
wanted_image_layers: Mutex::new(None),
compacting: Mutex::new(HashSet::default()),
walredo_mgr,
walreceiver: Mutex::new(None),
@@ -2306,7 +2309,7 @@ impl Timeline {
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: DeleteGuard,
_layer_removal_cs: DeleteGuardWrite,
layer: Arc<PersistentLayerDesc>,
_updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<()> {
@@ -2319,7 +2322,7 @@ impl Timeline {
fn delete_historic_layer_new(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: DeleteGuard,
_layer_removal_cs: DeleteGuardWrite,
layer: Arc<PersistentLayerDesc>,
updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<()> {
@@ -3406,7 +3409,7 @@ impl Timeline {
/// start of level0 files compaction, the on-demand download should be revisited as well.
async fn compact_level0_phase1(
&self,
_layer_removal_cs: DeleteGuard,
layer_removal_cs: DeleteGuardRead,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
@@ -3756,7 +3759,7 @@ impl Timeline {
///
async fn compact_level0(
self: &Arc<Self>,
layer_removal_cs: DeleteGuard,
layer_removal_cs: DeleteGuardRead,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
@@ -3773,6 +3776,9 @@ impl Timeline {
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
drop(layer_removal_cs);
let layer_removal_cs = self.lcache.delete_guard_write().await;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
return Ok(());
@@ -3848,7 +3854,10 @@ impl Timeline {
Ok(())
}
fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option<Vec<usize>> {
fn get_compact_task(
skip_tiers: &HashSet<usize>,
tier_sizes: Vec<(usize, u64)>,
) -> Option<Vec<usize>> {
let size_ratio = 1.25;
let space_amplification_ratio = 2.0;
let max_merge_width = 20;
@@ -3858,15 +3867,19 @@ impl Timeline {
let (_, last_tier_size) = *tier_sizes.last().unwrap();
let estimated_space_amp = (total_tier_size - last_tier_size) as f64 / last_tier_size as f64;
if estimated_space_amp > space_amplification_ratio {
info!("full compaction triggered by space amplification");
let tiers = tier_sizes
.iter()
.rev()
.take(max_merge_width)
.rev()
.map(|(tier_id, _)| *tier_id)
.collect::<Vec<_>>();
return Some(tiers);
if tier_sizes.is_empty() {
info!("full compaction cannot be triggered as some layers are being compacted");
} else {
info!("full compaction triggered by space amplification");
let tiers = tier_sizes
.iter()
.rev()
.take(max_merge_width)
.rev()
.map(|(tier_id, _)| *tier_id)
.collect::<Vec<_>>();
return Some(tiers);
}
}
// Trigger 2: by size ratio
@@ -3884,6 +3897,9 @@ impl Timeline {
.collect_vec();
return Some(compact_tiers);
}
if skip_tiers.contains(&tier_id) {
break;
}
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
@@ -3892,7 +3908,7 @@ impl Timeline {
async fn compact_tiered_phase1(
&self,
_layer_removal_cs: DeleteGuard,
_layer_removal_cs: DeleteGuardRead,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<Option<CompactTieredPhase1Result>, CompactionError> {
@@ -3918,19 +3934,24 @@ impl Timeline {
// Gather the files to compact in this iteration.
let tier_sizes: Vec<(usize, u64)> = layers.sorted_runs.compute_tier_sizes();
let mut compacting_tiers = self.compacting.lock().unwrap();
let Some(tier_to_compact) = Self::get_compact_task(tier_sizes) else {
let Some(tier_to_compact) = Self::get_compact_task(&compacting_tiers, tier_sizes) else {
return Ok(None);
};
println!("tier_to_compact: {tier_to_compact:?}");
let min_merge_width = 3;
if tier_to_compact.len() < min_merge_width {
return Ok(None);
}
println!("tier_to_compact: {tier_to_compact:?}");
for &tier in &tier_to_compact {
compacting_tiers.insert(tier);
}
drop(compacting_tiers);
let mut deltas_to_compact_layers = vec![];
for (tier_id, layers) in layers.sorted_runs.runs.iter() {
if tier_to_compact.contains(tier_id) {
@@ -4179,6 +4200,11 @@ impl Timeline {
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
let mut compacting_tiers = self.compacting.lock().unwrap();
for &tier in &tier_to_compact {
compacting_tiers.remove(&tier);
}
Ok(Some(CompactTieredPhase1Result {
new_layers,
new_tier_at: *tier_to_compact.last().unwrap(),
@@ -4190,7 +4216,7 @@ impl Timeline {
/// Tiered Compaction for level > 0
async fn compact_tiered(
self: &Arc<Self>,
layer_removal_cs: DeleteGuard,
layer_removal_cs: DeleteGuardRead,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
@@ -4202,6 +4228,9 @@ impl Timeline {
.compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await? else { return Ok(()); };
drop(layer_removal_cs);
let layer_removal_cs = self.lcache.delete_guard_write().await;
println!("new_tier_at: {:?}", new_tier_at);
println!("removed_tiers: {:?}", removed_tiers);
@@ -4406,7 +4435,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let layer_removal_cs = self.lcache.delete_guard().await;
let layer_removal_cs = self.lcache.delete_guard_write().await;
// Is the timeline being deleted?
if self.is_stopping() {
anyhow::bail!("timeline is Stopping");
@@ -4444,7 +4473,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: DeleteGuard,
layer_removal_cs: DeleteGuardWrite,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,