From 3fc3666df73f3d148973831762bf87508a92d3e3 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 20 Jul 2023 13:39:19 -0400 Subject: [PATCH] make flush frozen layer an atomic operation (#4720) ## Problem close https://github.com/neondatabase/neon/issues/4712 ## Summary of changes Previously, when flushing frozen layers, it was split into two operations: add delta layer to disk + remove frozen layer from memory. This would cause a short period of time where we will have the same data both in frozen and delta layer. In this PR, we merge them into one atomic operation in layer map manager, therefore simplifying the code. Note that if we decide to create image layers for L0 flush, it will still be split into two operations on layer map. --------- Signed-off-by: Alex Chi Z Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/timeline.rs | 83 ++++++++++--------- .../src/tenant/timeline/layer_manager.rs | 19 ++++- test_runner/regress/test_ancestor_branch.py | 2 +- test_runner/regress/test_recovery.py | 4 +- test_runner/regress/test_remote_storage.py | 4 +- 5 files changed, 64 insertions(+), 48 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e5fedeb73e..c6ceae500b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2703,7 +2703,7 @@ impl Timeline { // files instead. This is possible as long as *all* the data imported into the // repository have the same LSN. let lsn_range = frozen_layer.get_lsn_range(); - let layer_paths_to_upload = + let (layer_paths_to_upload, delta_layer_to_add) = if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) { #[cfg(test)] match &mut *self.flush_loop_state.lock().unwrap() { @@ -2722,8 +2722,12 @@ impl Timeline { let (partitioning, _lsn) = self .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx) .await?; - self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) - .await? + // For image layers, we add them immediately into the layer map. + ( + self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) + .await?, + None, + ) } else { #[cfg(test)] match &mut *self.flush_loop_state.lock().unwrap() { @@ -2737,35 +2741,50 @@ impl Timeline { assert!(!*expect_initdb_optimization, "expected initdb optimization"); } } - // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; - HashMap::from([(delta_path, metadata)]) + // Normal case, write out a L0 delta layer file. + // `create_delta_layer` will not modify the layer map. + // We will remove frozen layer and add delta layer in one atomic operation later. + let layer = self.create_delta_layer(&frozen_layer).await?; + ( + HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]), + Some(layer), + ) }; - // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, - // a compaction can delete the file and then it won't be available for uploads any more. - // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this - // race situation. - // See https://github.com/neondatabase/neon/issues/4526 - - pausable_failpoint!("flush-frozen-before-sync"); - // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. The flushed layer is stored in // the mapping in `create_delta_layer`. { let mut guard = self.layers.write().await; - let l = guard.layer_map_mut().frozen_layers.pop_front(); - // Only one thread may call this function at a time (for this - // timeline). If two threads tried to flush the same frozen - // layer to disk at the same time, that would not work. - assert!(compare_arced_layers(&l.unwrap(), &frozen_layer)); + if let Some(ref l) = delta_layer_to_add { + // TODO: move access stats, metrics update, etc. into layer manager. + l.access_stats().record_residence_event( + &guard, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + // update metrics + let sz = l.file_size(); + self.metrics.resident_physical_size_gauge.add(sz); + self.metrics.num_persistent_files_created.inc_by(1); + self.metrics.persistent_bytes_written.inc_by(sz); + } + + guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); // release lock on 'layers' } - fail_point!("checkpoint-after-sync"); + // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, + // a compaction can delete the file and then it won't be available for uploads any more. + // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this + // race situation. + // See https://github.com/neondatabase/neon/issues/4526 + pausable_failpoint!("flush-frozen-pausable"); + + // This failpoint is used by another test case `test_pageserver_recovery`. + fail_point!("flush-frozen-exit"); // Update the metadata file, with new 'disk_consistent_lsn' // @@ -2847,11 +2866,12 @@ impl Timeline { Ok(()) } - // Write out the given frozen in-memory layer as a new L0 delta file + // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked + // in layer map immediately. The caller is responsible to put it into the layer map. async fn create_delta_layer( self: &Arc, frozen_layer: &Arc, - ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { + ) -> anyhow::Result { let span = tracing::info_span!("blocking"); let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); @@ -2888,25 +2908,8 @@ impl Timeline { }) .await .context("spawn_blocking")??; - let new_delta_name = new_delta.filename(); - let sz = new_delta.desc.file_size; - // Add it to the layer map - let l = Arc::new(new_delta); - let mut guard = self.layers.write().await; - l.access_stats().record_residence_event( - &guard, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - guard.track_new_l0_delta_layer(l); - - // update metrics - self.metrics.resident_physical_size_gauge.add(sz); - self.metrics.num_persistent_files_created.inc_by(1); - self.metrics.persistent_bytes_written.inc_by(sz); - - Ok((new_delta_name, LayerFileMetadata::new(sz))) + Ok(new_delta) } async fn repartition( diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 77f5f38314..f6f0d533d1 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -194,10 +194,23 @@ impl LayerManager { updates.flush(); } - /// Insert into the layer map when a new delta layer is created, called from `create_delta_layer`. - pub fn track_new_l0_delta_layer(&mut self, delta_layer: Arc) { + /// Flush a frozen layer and add the written delta layer to the layer map. + pub fn finish_flush_l0_layer( + &mut self, + delta_layer: Option, + frozen_layer_for_check: &Arc, + ) { + let l = self.layer_map.frozen_layers.pop_front(); let mut updates = self.layer_map.batch_update(); - Self::insert_historic_layer(delta_layer, &mut updates, &mut self.layer_fmgr); + + // Only one thread may call this function at a time (for this + // timeline). If two threads tried to flush the same frozen + // layer to disk at the same time, that would not work. + assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check)); + + if let Some(delta_layer) = delta_layer { + Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr); + } updates.flush(); } diff --git a/test_runner/regress/test_ancestor_branch.py b/test_runner/regress/test_ancestor_branch.py index e8c1a2f34c..0e390ba9e5 100644 --- a/test_runner/regress/test_ancestor_branch.py +++ b/test_runner/regress/test_ancestor_branch.py @@ -20,7 +20,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): } ) - pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)")) + pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)")) endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant) branch0_cur = endpoint_branch0.connect().cursor() diff --git a/test_runner/regress/test_recovery.py b/test_runner/regress/test_recovery.py index 76e97a35a4..552825cf08 100644 --- a/test_runner/regress/test_recovery.py +++ b/test_runner/regress/test_recovery.py @@ -38,8 +38,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder): # Configure failpoints pageserver_http.configure_failpoints( [ - ("flush-frozen-before-sync", "sleep(2000)"), - ("checkpoint-after-sync", "exit"), + ("flush-frozen-pausable", "sleep(2000)"), + ("flush-frozen-exit", "exit"), ] ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 13bc01f609..f1575ae4d3 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -815,7 +815,7 @@ def test_compaction_delete_before_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("flush-frozen-before-sync", "pause")) + client.configure_failpoints(("flush-frozen-pausable", "pause")) endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") @@ -841,7 +841,7 @@ def test_compaction_delete_before_upload( time.sleep(4) client.timeline_compact(tenant_id, timeline_id) - client.configure_failpoints(("flush-frozen-before-sync", "off")) + client.configure_failpoints(("flush-frozen-pausable", "off")) conflict = q.get()