diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e5fedeb73e..c6ceae500b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2703,7 +2703,7 @@ impl Timeline { // files instead. This is possible as long as *all* the data imported into the // repository have the same LSN. let lsn_range = frozen_layer.get_lsn_range(); - let layer_paths_to_upload = + let (layer_paths_to_upload, delta_layer_to_add) = if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) { #[cfg(test)] match &mut *self.flush_loop_state.lock().unwrap() { @@ -2722,8 +2722,12 @@ impl Timeline { let (partitioning, _lsn) = self .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx) .await?; - self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) - .await? + // For image layers, we add them immediately into the layer map. + ( + self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) + .await?, + None, + ) } else { #[cfg(test)] match &mut *self.flush_loop_state.lock().unwrap() { @@ -2737,35 +2741,50 @@ impl Timeline { assert!(!*expect_initdb_optimization, "expected initdb optimization"); } } - // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; - HashMap::from([(delta_path, metadata)]) + // Normal case, write out a L0 delta layer file. + // `create_delta_layer` will not modify the layer map. + // We will remove frozen layer and add delta layer in one atomic operation later. + let layer = self.create_delta_layer(&frozen_layer).await?; + ( + HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]), + Some(layer), + ) }; - // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, - // a compaction can delete the file and then it won't be available for uploads any more. - // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this - // race situation. - // See https://github.com/neondatabase/neon/issues/4526 - - pausable_failpoint!("flush-frozen-before-sync"); - // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. The flushed layer is stored in // the mapping in `create_delta_layer`. { let mut guard = self.layers.write().await; - let l = guard.layer_map_mut().frozen_layers.pop_front(); - // Only one thread may call this function at a time (for this - // timeline). If two threads tried to flush the same frozen - // layer to disk at the same time, that would not work. - assert!(compare_arced_layers(&l.unwrap(), &frozen_layer)); + if let Some(ref l) = delta_layer_to_add { + // TODO: move access stats, metrics update, etc. into layer manager. + l.access_stats().record_residence_event( + &guard, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + // update metrics + let sz = l.file_size(); + self.metrics.resident_physical_size_gauge.add(sz); + self.metrics.num_persistent_files_created.inc_by(1); + self.metrics.persistent_bytes_written.inc_by(sz); + } + + guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); // release lock on 'layers' } - fail_point!("checkpoint-after-sync"); + // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, + // a compaction can delete the file and then it won't be available for uploads any more. + // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this + // race situation. + // See https://github.com/neondatabase/neon/issues/4526 + pausable_failpoint!("flush-frozen-pausable"); + + // This failpoint is used by another test case `test_pageserver_recovery`. + fail_point!("flush-frozen-exit"); // Update the metadata file, with new 'disk_consistent_lsn' // @@ -2847,11 +2866,12 @@ impl Timeline { Ok(()) } - // Write out the given frozen in-memory layer as a new L0 delta file + // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked + // in layer map immediately. The caller is responsible to put it into the layer map. async fn create_delta_layer( self: &Arc, frozen_layer: &Arc, - ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { + ) -> anyhow::Result { let span = tracing::info_span!("blocking"); let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); @@ -2888,25 +2908,8 @@ impl Timeline { }) .await .context("spawn_blocking")??; - let new_delta_name = new_delta.filename(); - let sz = new_delta.desc.file_size; - // Add it to the layer map - let l = Arc::new(new_delta); - let mut guard = self.layers.write().await; - l.access_stats().record_residence_event( - &guard, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - guard.track_new_l0_delta_layer(l); - - // update metrics - self.metrics.resident_physical_size_gauge.add(sz); - self.metrics.num_persistent_files_created.inc_by(1); - self.metrics.persistent_bytes_written.inc_by(sz); - - Ok((new_delta_name, LayerFileMetadata::new(sz))) + Ok(new_delta) } async fn repartition( diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 77f5f38314..f6f0d533d1 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -194,10 +194,23 @@ impl LayerManager { updates.flush(); } - /// Insert into the layer map when a new delta layer is created, called from `create_delta_layer`. - pub fn track_new_l0_delta_layer(&mut self, delta_layer: Arc) { + /// Flush a frozen layer and add the written delta layer to the layer map. + pub fn finish_flush_l0_layer( + &mut self, + delta_layer: Option, + frozen_layer_for_check: &Arc, + ) { + let l = self.layer_map.frozen_layers.pop_front(); let mut updates = self.layer_map.batch_update(); - Self::insert_historic_layer(delta_layer, &mut updates, &mut self.layer_fmgr); + + // Only one thread may call this function at a time (for this + // timeline). If two threads tried to flush the same frozen + // layer to disk at the same time, that would not work. + assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check)); + + if let Some(delta_layer) = delta_layer { + Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr); + } updates.flush(); } diff --git a/test_runner/regress/test_ancestor_branch.py b/test_runner/regress/test_ancestor_branch.py index e8c1a2f34c..0e390ba9e5 100644 --- a/test_runner/regress/test_ancestor_branch.py +++ b/test_runner/regress/test_ancestor_branch.py @@ -20,7 +20,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): } ) - pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)")) + pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)")) endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant) branch0_cur = endpoint_branch0.connect().cursor() diff --git a/test_runner/regress/test_recovery.py b/test_runner/regress/test_recovery.py index 76e97a35a4..552825cf08 100644 --- a/test_runner/regress/test_recovery.py +++ b/test_runner/regress/test_recovery.py @@ -38,8 +38,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder): # Configure failpoints pageserver_http.configure_failpoints( [ - ("flush-frozen-before-sync", "sleep(2000)"), - ("checkpoint-after-sync", "exit"), + ("flush-frozen-pausable", "sleep(2000)"), + ("flush-frozen-exit", "exit"), ] ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 13bc01f609..f1575ae4d3 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -815,7 +815,7 @@ def test_compaction_delete_before_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("flush-frozen-before-sync", "pause")) + client.configure_failpoints(("flush-frozen-pausable", "pause")) endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") @@ -841,7 +841,7 @@ def test_compaction_delete_before_upload( time.sleep(4) client.timeline_compact(tenant_id, timeline_id) - client.configure_failpoints(("flush-frozen-before-sync", "off")) + client.configure_failpoints(("flush-frozen-pausable", "off")) conflict = q.get()