bring back original compaction

Signed-off-by: Alex Chi <chi@neon.tech>
This commit is contained in:
Alex Chi
2023-06-27 13:38:02 -04:00
parent a78008ad82
commit 335710cec6

View File

@@ -87,6 +87,8 @@ use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
};
const ENABLE_TIERED_COMPACTION: bool = false;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
NotStarted,
@@ -857,6 +859,20 @@ impl Timeline {
.await
{
Ok((partitioning, lsn)) => {
if !ENABLE_TIERED_COMPACTION {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self
.create_bottom_image_layers(&partitioning, lsn, false, ctx)
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
for (path, layer_metadata) in layer_paths_to_upload {
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
}
}
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
@@ -3742,8 +3758,88 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
self.compact_tiered(layer_removal_cs, target_file_size, ctx)
.await
if ENABLE_TIERED_COMPACTION {
return self
.compact_tiered(layer_removal_cs, target_file_size, ctx)
.await;
}
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
return Ok(());
}
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
let new_delta_path = l.path();
let metadata = new_delta_path.metadata().with_context(|| {
format!(
"read file metadata for new created layer {}",
new_delta_path.display()
)
})?;
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&l.filename(),
&LayerFileMetadata::new(metadata.len()),
)?;
}
// update the timeline's physical size
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x.layer_desc().clone());
self.lcache.create_new_layer(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
}
updates.flush();
drop_wlock(guard);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
Ok(())
}
fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option<Vec<usize>> {