make flush frozen layer an atomic operation (#4720)

## Problem

close https://github.com/neondatabase/neon/issues/4712

## Summary of changes

Previously, when flushing frozen layers, it was split into two
operations: add delta layer to disk + remove frozen layer from memory.
This would cause a short period of time where we will have the same data
both in frozen and delta layer. In this PR, we merge them into one
atomic operation in layer map manager, therefore simplifying the code.
Note that if we decide to create image layers for L0 flush, it will
still be split into two operations on layer map.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
Alex Chi Z
2023-07-20 13:39:19 -04:00
committed by GitHub
parent 89746a48c6
commit 3fc3666df7
5 changed files with 64 additions and 48 deletions

View File

@@ -2703,7 +2703,7 @@ impl Timeline {
// files instead. This is possible as long as *all* the data imported into the
// repository have the same LSN.
let lsn_range = frozen_layer.get_lsn_range();
let layer_paths_to_upload =
let (layer_paths_to_upload, delta_layer_to_add) =
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
#[cfg(test)]
match &mut *self.flush_loop_state.lock().unwrap() {
@@ -2722,8 +2722,12 @@ impl Timeline {
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.await?;
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?,
None,
)
} else {
#[cfg(test)]
match &mut *self.flush_loop_state.lock().unwrap() {
@@ -2737,35 +2741,50 @@ impl Timeline {
assert!(!*expect_initdb_optimization, "expected initdb optimization");
}
}
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let layer = self.create_delta_layer(&frozen_layer).await?;
(
HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]),
Some(layer),
)
};
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
// race situation.
// See https://github.com/neondatabase/neon/issues/4526
pausable_failpoint!("flush-frozen-before-sync");
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let mut guard = self.layers.write().await;
let l = guard.layer_map_mut().frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(compare_arced_layers(&l.unwrap(), &frozen_layer));
if let Some(ref l) = delta_layer_to_add {
// TODO: move access stats, metrics update, etc. into layer manager.
l.access_stats().record_residence_event(
&guard,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
// update metrics
let sz = l.file_size();
self.metrics.resident_physical_size_gauge.add(sz);
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
}
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
// release lock on 'layers'
}
fail_point!("checkpoint-after-sync");
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
// race situation.
// See https://github.com/neondatabase/neon/issues/4526
pausable_failpoint!("flush-frozen-pausable");
// This failpoint is used by another test case `test_pageserver_recovery`.
fail_point!("flush-frozen-exit");
// Update the metadata file, with new 'disk_consistent_lsn'
//
@@ -2847,11 +2866,12 @@ impl Timeline {
Ok(())
}
// Write out the given frozen in-memory layer as a new L0 delta file
// Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
// in layer map immediately. The caller is responsible to put it into the layer map.
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
) -> anyhow::Result<DeltaLayer> {
let span = tracing::info_span!("blocking");
let new_delta: DeltaLayer = tokio::task::spawn_blocking({
let _g = span.entered();
@@ -2888,25 +2908,8 @@ impl Timeline {
})
.await
.context("spawn_blocking")??;
let new_delta_name = new_delta.filename();
let sz = new_delta.desc.file_size;
// Add it to the layer map
let l = Arc::new(new_delta);
let mut guard = self.layers.write().await;
l.access_stats().record_residence_event(
&guard,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
guard.track_new_l0_delta_layer(l);
// update metrics
self.metrics.resident_physical_size_gauge.add(sz);
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
Ok((new_delta_name, LayerFileMetadata::new(sz)))
Ok(new_delta)
}
async fn repartition(

View File

@@ -194,10 +194,23 @@ impl LayerManager {
updates.flush();
}
/// Insert into the layer map when a new delta layer is created, called from `create_delta_layer`.
pub fn track_new_l0_delta_layer(&mut self, delta_layer: Arc<DeltaLayer>) {
/// Flush a frozen layer and add the written delta layer to the layer map.
pub fn finish_flush_l0_layer(
&mut self,
delta_layer: Option<DeltaLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
) {
let l = self.layer_map.frozen_layers.pop_front();
let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(delta_layer, &mut updates, &mut self.layer_fmgr);
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
if let Some(delta_layer) = delta_layer {
Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}

View File

@@ -20,7 +20,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
}
)
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)"))
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()

View File

@@ -38,8 +38,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
# Configure failpoints
pageserver_http.configure_failpoints(
[
("flush-frozen-before-sync", "sleep(2000)"),
("checkpoint-after-sync", "exit"),
("flush-frozen-pausable", "sleep(2000)"),
("flush-frozen-exit", "exit"),
]
)

View File

@@ -815,7 +815,7 @@ def test_compaction_delete_before_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("flush-frozen-before-sync", "pause"))
client.configure_failpoints(("flush-frozen-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
@@ -841,7 +841,7 @@ def test_compaction_delete_before_upload(
time.sleep(4)
client.timeline_compact(tenant_id, timeline_id)
client.configure_failpoints(("flush-frozen-before-sync", "off"))
client.configure_failpoints(("flush-frozen-pausable", "off"))
conflict = q.get()