diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0f8e60f8d3..1728c7be32 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1220,78 +1220,76 @@ impl Timeline { // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing // *all* the layers, to avoid fsyncing the file multiple times. let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); - self.update_disk_consistent_lsn(disk_consistent_lsn, layer_paths_to_upload)?; + let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); + // If we were able to advance 'disk_consistent_lsn', save it the metadata file. + // After crash, we will restart WAL streaming and processing from that point. + if disk_consistent_lsn != old_disk_consistent_lsn { + assert!(disk_consistent_lsn > old_disk_consistent_lsn); + self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)?; + // Also update the in-memory copy + self.disk_consistent_lsn.store(disk_consistent_lsn); + } Ok(()) } /// Update metadata file - fn update_disk_consistent_lsn( + fn update_metadata_file( &self, disk_consistent_lsn: Lsn, layer_paths_to_upload: HashMap, ) -> Result<()> { - // If we were able to advance 'disk_consistent_lsn', save it the metadata file. - // After crash, we will restart WAL streaming and processing from that point. - let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); - if disk_consistent_lsn != old_disk_consistent_lsn { - assert!(disk_consistent_lsn > old_disk_consistent_lsn); + // We can only save a valid 'prev_record_lsn' value on disk if we + // flushed *all* in-memory changes to disk. We only track + // 'prev_record_lsn' in memory for the latest processed record, so we + // don't remember what the correct value that corresponds to some old + // LSN is. But if we flush everything, then the value corresponding + // current 'last_record_lsn' is correct and we can store it on disk. + let RecordLsn { + last: last_record_lsn, + prev: prev_record_lsn, + } = self.last_record_lsn.load(); + let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn { + Some(prev_record_lsn) + } else { + None + }; - // We can only save a valid 'prev_record_lsn' value on disk if we - // flushed *all* in-memory changes to disk. We only track - // 'prev_record_lsn' in memory for the latest processed record, so we - // don't remember what the correct value that corresponds to some old - // LSN is. But if we flush everything, then the value corresponding - // current 'last_record_lsn' is correct and we can store it on disk. - let RecordLsn { - last: last_record_lsn, - prev: prev_record_lsn, - } = self.last_record_lsn.load(); - let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn { - Some(prev_record_lsn) - } else { - None - }; + let ancestor_timeline_id = self + .ancestor_timeline + .as_ref() + .map(|ancestor| ancestor.timeline_id); - let ancestor_timeline_id = self - .ancestor_timeline - .as_ref() - .map(|ancestor| ancestor.timeline_id); + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + ondisk_prev_record_lsn, + ancestor_timeline_id, + self.ancestor_lsn, + *self.latest_gc_cutoff_lsn.read(), + self.initdb_lsn, + self.pg_version, + ); - let metadata = TimelineMetadata::new( - disk_consistent_lsn, - ondisk_prev_record_lsn, - ancestor_timeline_id, - self.ancestor_lsn, - *self.latest_gc_cutoff_lsn.read(), - self.initdb_lsn, - self.pg_version, - ); + fail_point!("checkpoint-before-saving-metadata", |x| bail!( + "{}", + x.unwrap() + )); - fail_point!("checkpoint-before-saving-metadata", |x| bail!( - "{}", - x.unwrap() - )); + save_metadata( + self.conf, + self.timeline_id, + self.tenant_id, + &metadata, + false, + )?; - save_metadata( - self.conf, - self.timeline_id, + if self.upload_layers.load(atomic::Ordering::Relaxed) { + storage_sync::schedule_layer_upload( self.tenant_id, - &metadata, - false, - )?; - - if self.upload_layers.load(atomic::Ordering::Relaxed) { - storage_sync::schedule_layer_upload( - self.tenant_id, - self.timeline_id, - layer_paths_to_upload, - Some(metadata), - ); - } - - // Also update the in-memory copy - self.disk_consistent_lsn.store(disk_consistent_lsn); + self.timeline_id, + layer_paths_to_upload, + Some(metadata), + ); } Ok(()) @@ -1961,6 +1959,9 @@ impl Timeline { new_gc_cutoff ); write_guard.store_and_unlock(new_gc_cutoff).wait(); + + // Persist metadata file + self.update_metadata_file(self.disk_consistent_lsn.load(), HashSet::new())?; } info!("GC starting"); @@ -2087,6 +2088,18 @@ impl Timeline { result.layers_removed += 1; } + info!( + "GC completed removing {} layers, cuttof {}", + result.layers_removed, new_gc_cutoff + ); + if result.layers_removed != 0 { + fail_point!("gc-before-save-metadata", |_| { + info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point"); + std::process::abort(); + }); + return Ok(result); + } + if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_delete( self.tenant_id, diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 8329b15c08..030055df6d 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -70,8 +70,10 @@ async fn compaction_loop(tenant_id: TenantId) { // Run compaction let mut sleep_duration = tenant.get_compaction_period(); if let Err(e) = tenant.compaction_iteration() { - error!("Compaction failed, retrying: {e:#}"); sleep_duration = wait_duration; + error!("Compaction failed, retrying in {:?}: {e:#}", sleep_duration); + #[cfg(feature = "testing")] + std::process::abort(); } // Sleep @@ -119,8 +121,10 @@ async fn gc_loop(tenant_id: TenantId) { if gc_horizon > 0 { if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false) { - error!("Gc failed, retrying: {e:#}"); sleep_duration = wait_duration; + error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration); + #[cfg(feature = "testing")] + std::process::abort(); } } diff --git a/test_runner/regress/test_gc_cutoff.py b/test_runner/regress/test_gc_cutoff.py new file mode 100644 index 0000000000..946c689a30 --- /dev/null +++ b/test_runner/regress/test_gc_cutoff.py @@ -0,0 +1,38 @@ +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin +from performance.test_perf_pgbench import get_scales_matrix + + +# Test gc_cuttoff +# +# This test set fail point after at the end of GC and checks +# that pageserver normally restarts after it +@pytest.mark.parametrize("scale", get_scales_matrix(10)) +def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int): + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Use aggressive GC and checkpoint settings, so that we also exercise GC during the test + tenant_id, _ = env.neon_cli.create_tenant( + conf={ + "gc_period": "10 s", + "gc_horizon": f"{1024 ** 2}", + "checkpoint_distance": f"{1024 ** 2}", + "compaction_target_size": f"{1024 ** 2}", + # set PITR interval to be small, so we can do GC + "pitr_interval": "1 s", + } + ) + pg = env.postgres.create_start("main", tenant_id=tenant_id) + connstr = pg.connstr() + pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) + + pageserver_http.configure_failpoints(("gc-before-save-metadata", "return")) + + for i in range(5): + try: + pg_bin.run_capture(["pgbench", "-T100", connstr]) + except Exception: + env.pageserver.stop() + env.pageserver.start() + pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))