diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index bf2c48e56c..0aef3afacc 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -146,7 +146,7 @@ impl Repository for LayeredRepository { // Create the timeline directory, and write initial metadata to file. crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; - let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0)); + let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0)); Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?; let timeline = LayeredTimeline::new( @@ -168,7 +168,19 @@ impl Repository for LayeredRepository { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> { - let src_timeline = self.get_timeline(src)?; + // We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn + // about timelines, so otherwise a race condition is possible, where we create new timeline and GC + // concurrently removes data that is needed by the new timeline. + let mut timelines = self.timelines.lock().unwrap(); + let src_timeline = self.get_timeline_locked(src, &mut timelines)?; + + let latest_gc_cutoff = src_timeline.latest_gc_cutoff.load(); + ensure!( + start_lsn >= latest_gc_cutoff, + "Branch start LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)", + start_lsn, + latest_gc_cutoff, + ); let RecordLsn { last: src_last, @@ -185,11 +197,20 @@ impl Repository for LayeredRepository { // Create the metadata file, noting the ancestor of the new timeline. // There is initially no data in it, but all the read-calls know to look // into the ancestor. - let metadata = TimelineMetadata::new(start_lsn, dst_prev, Some(src), start_lsn); + let metadata = TimelineMetadata::new( + start_lsn, + dst_prev, + Some(src), + start_lsn, + src_timeline.latest_gc_cutoff.load(), + ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?; Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?; - info!("branched timeline {} from {} at {}", dst, src, start_lsn); + info!( + "branched timeline {} from {} at {} latest_gc_cutoff {}", + dst, src, start_lsn, latest_gc_cutoff + ); Ok(()) } @@ -552,6 +573,9 @@ pub struct LayeredTimeline { /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. write_lock: Mutex<()>, + + // Needed to ensure that we can't create a branch at a point that was already garbage collected + latest_gc_cutoff: AtomicLsn, } /// Public interface functions @@ -591,6 +615,13 @@ impl Timeline for LayeredTimeline { ); } debug_assert!(lsn <= self.get_last_record_lsn()); + let latest_gc_cutoff = self.latest_gc_cutoff.load(); + // error instead of assert to simplify testing + ensure!( + lsn >= latest_gc_cutoff, + "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", + lsn, latest_gc_cutoff + ); let seg = SegmentTag::from_blknum(rel, blknum); @@ -857,6 +888,8 @@ impl LayeredTimeline { upload_relishes, write_lock: Mutex::new(()), + + latest_gc_cutoff: AtomicLsn::from(metadata.latest_gc_cutoff()), }; Ok(timeline) } @@ -1237,6 +1270,7 @@ impl LayeredTimeline { ondisk_prev_record_lsn, ancestor_timelineid, self.ancestor_lsn, + self.latest_gc_cutoff.load(), ); LayeredRepository::save_metadata( @@ -1334,6 +1368,10 @@ impl LayeredTimeline { let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); + // We need to ensure that no one branches at a point before latest_gc_cutoff. + // See branch_timeline() for details. + self.latest_gc_cutoff.store(cutoff); + info!("GC starting"); debug!("retain_lsns: {:?}", retain_lsns); @@ -1495,11 +1533,12 @@ impl LayeredTimeline { // We didn't find any reason to keep this file, so remove it. info!( - "garbage collecting {} {}-{} {}", + "garbage collecting {} {}-{} is_dropped: {} is_incremental: {}", l.get_seg_tag(), l.get_start_lsn(), l.get_end_lsn(), - l.is_dropped() + l.is_dropped(), + l.is_incremental(), ); layers_to_remove.push(Arc::clone(&l)); } @@ -1582,12 +1621,19 @@ impl LayeredTimeline { let mut layer_ref = layer; let mut curr_lsn = lsn; loop { - match layer_ref.get_page_reconstruct_data( - blknum, - curr_lsn, - cached_lsn_opt, - &mut data, - )? { + let result = layer_ref + .get_page_reconstruct_data(blknum, curr_lsn, cached_lsn_opt, &mut data) + .with_context(|| { + format!( + "Failed to get reconstruct data {} {:?} {} {} {:?}", + layer_ref.get_seg_tag(), + layer_ref.filename(), + blknum, + curr_lsn, + cached_lsn_opt, + ) + })?; + match result { PageReconstructResult::Complete => break, PageReconstructResult::Continue(cont_lsn) => { // Fetch base image / more WAL from the returned predecessor layer diff --git a/pageserver/src/layered_repository/metadata.rs b/pageserver/src/layered_repository/metadata.rs index b660b5506a..61b402e54c 100644 --- a/pageserver/src/layered_repository/metadata.rs +++ b/pageserver/src/layered_repository/metadata.rs @@ -42,6 +42,7 @@ pub struct TimelineMetadata { prev_record_lsn: Option, ancestor_timeline: Option, ancestor_lsn: Lsn, + latest_gc_cutoff: Lsn, } /// Points to a place in pageserver's local directory, @@ -61,12 +62,14 @@ impl TimelineMetadata { prev_record_lsn: Option, ancestor_timeline: Option, ancestor_lsn: Lsn, + latest_gc_cutoff: Lsn, ) -> Self { Self { disk_consistent_lsn, prev_record_lsn, ancestor_timeline, ancestor_lsn, + latest_gc_cutoff, } } @@ -121,6 +124,10 @@ impl TimelineMetadata { pub fn ancestor_lsn(&self) -> Lsn { self.ancestor_lsn } + + pub fn latest_gc_cutoff(&self) -> Lsn { + self.latest_gc_cutoff + } } /// This module is for direct conversion of metadata to bytes and back. @@ -139,6 +146,7 @@ mod serialize { prev_record_lsn: &'a Option, ancestor_timeline: &'a Option, ancestor_lsn: &'a Lsn, + latest_gc_cutoff: &'a Lsn, } impl<'a> From<&'a TimelineMetadata> for SeTimelineMetadata<'a> { @@ -148,6 +156,7 @@ mod serialize { prev_record_lsn: &other.prev_record_lsn, ancestor_timeline: &other.ancestor_timeline, ancestor_lsn: &other.ancestor_lsn, + latest_gc_cutoff: &other.latest_gc_cutoff, } } } @@ -158,6 +167,7 @@ mod serialize { prev_record_lsn: Option, ancestor_timeline: Option, ancestor_lsn: Lsn, + latest_gc_cutoff: Lsn, } impl From for TimelineMetadata { @@ -167,6 +177,7 @@ mod serialize { prev_record_lsn: other.prev_record_lsn, ancestor_timeline: other.ancestor_timeline, ancestor_lsn: other.ancestor_lsn, + latest_gc_cutoff: other.latest_gc_cutoff, } } } @@ -185,6 +196,7 @@ mod tests { prev_record_lsn: Some(Lsn(0x100)), ancestor_timeline: Some(TIMELINE_ID), ancestor_lsn: Lsn(0), + latest_gc_cutoff: Lsn(0), }; let metadata_bytes = original_metadata diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 6b6b5fb12f..96d5d2698f 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -1562,6 +1562,6 @@ mod tests { } fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata { - TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0)) + TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0)) } } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 66d930bb85..7ba608e001 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -769,6 +769,70 @@ mod tests { Ok(()) } + fn make_some_layers(tline: &Arc) -> Result<()> { + { + let writer = tline.writer(); + // Create a relation on the timeline + writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?; + writer.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?; + writer.advance_last_record_lsn(Lsn(0x30)); + } + // materialize layers, lsn 20-30 + tline.checkpoint(CheckpointConfig::Forced)?; + { + let writer = tline.writer(); + writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?; + writer.put_page_image(TESTREL_A, 0, Lsn(0x50), TEST_IMG("foo blk 0 at 5"))?; + writer.advance_last_record_lsn(Lsn(0x50)); + } + // materialize layers, lsn 40-50 + tline.checkpoint(CheckpointConfig::Forced) + } + + #[test] + fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> { + let repo = + RepoHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?.load(); + + let tline = repo.create_empty_timeline(TIMELINE_ID)?; + make_some_layers(&tline)?; + + // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 + repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + + // try to branch at lsn 25, should fail because we already garbage collected the data + match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) { + Ok(_) => panic!("branching should have failed"), + Err(err) => assert!(err + .to_string() + .contains("we might've already garbage collected needed data")), + } + + Ok(()) + } + + #[test] + fn test_prohibit_get_page_at_lsn_for_garbage_collected_pages() -> Result<()> { + let repo = + RepoHarness::create("test_prohibit_get_page_at_lsn_for_garbage_collected_pages")? + .load(); + + let tline = repo.create_empty_timeline(TIMELINE_ID)?; + make_some_layers(&tline)?; + + // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 + repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + + match tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)) { + Ok(_) => panic!("request for page should have failed"), + Err(err) => assert!(err + .to_string() + .contains("tried to request a page version that was garbage collected")), + } + + Ok(()) + } + #[test] fn corrupt_metadata() -> Result<()> { const TEST_NAME: &str = "corrupt_metadata"; diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index f13079503d..9e38b990ea 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -1,6 +1,11 @@ import subprocess -from fixtures.zenith_fixtures import ZenithEnv +from contextlib import closing + +import psycopg2.extras +import pytest from fixtures.log_helper import log +from fixtures.utils import print_gc_result +from fixtures.zenith_fixtures import ZenithEnv pytest_plugins = ("fixtures.zenith_fixtures") @@ -19,8 +24,16 @@ def test_branch_behind(zenith_simple_env: ZenithEnv): main_pg_conn = pgmain.connect() main_cur = main_pg_conn.cursor() + main_cur.execute("SHOW zenith.zenith_timeline") + timeline = main_cur.fetchone()[0] + # Create table, and insert the first 100 rows main_cur.execute('CREATE TABLE foo (t text)') + + # keep some early lsn to test branch creation on out of date lsn + main_cur.execute('SELECT pg_current_wal_insert_lsn()') + gced_lsn = main_cur.fetchone()[0] + main_cur.execute(''' INSERT INTO foo SELECT 'long string to consume some space' || g @@ -94,3 +107,15 @@ def test_branch_behind(zenith_simple_env: ZenithEnv): # FIXME: assert false, "branch with invalid LSN should have failed" except subprocess.CalledProcessError: log.info("Branch creation with pre-initdb LSN failed (as expected)") + + # check that we cannot create branch based on garbage collected data + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: + # call gc to advace latest_gc_cutoff + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row) + + with pytest.raises(Exception, match="(we might've already garbage collected needed data)"): + # this gced_lsn is pretty random, so if gc is disabled this woudln't fail + env.zenith_cli(["branch", "test_branch_create_fail", f"test_branch_behind@{gced_lsn}"]) diff --git a/test_runner/batch_others/test_readonly_node.py b/test_runner/batch_others/test_readonly_node.py index 904b5da5c9..850f32fba9 100644 --- a/test_runner/batch_others/test_readonly_node.py +++ b/test_runner/batch_others/test_readonly_node.py @@ -1,4 +1,5 @@ -import subprocess +import pytest +from fixtures.log_helper import log from fixtures.zenith_fixtures import ZenithEnv pytest_plugins = ("fixtures.zenith_fixtures") @@ -15,13 +16,14 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): env.zenith_cli(["branch", "test_readonly_node", "empty"]) pgmain = env.postgres.create_start('test_readonly_node') - print("postgres is running on 'test_readonly_node' branch") + log.info("postgres is running on 'test_readonly_node' branch") main_pg_conn = pgmain.connect() main_cur = main_pg_conn.cursor() # Create table, and insert the first 100 rows main_cur.execute('CREATE TABLE foo (t text)') + main_cur.execute(''' INSERT INTO foo SELECT 'long string to consume some space' || g @@ -29,7 +31,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): ''') main_cur.execute('SELECT pg_current_wal_insert_lsn()') lsn_a = main_cur.fetchone()[0] - print('LSN after 100 rows: ' + lsn_a) + log.info('LSN after 100 rows: ' + lsn_a) # Insert some more rows. (This generates enough WAL to fill a few segments.) main_cur.execute(''' @@ -39,7 +41,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): ''') main_cur.execute('SELECT pg_current_wal_insert_lsn()') lsn_b = main_cur.fetchone()[0] - print('LSN after 200100 rows: ' + lsn_b) + log.info('LSN after 200100 rows: ' + lsn_b) # Insert many more rows. This generates enough WAL to fill a few segments. main_cur.execute(''' @@ -50,7 +52,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): main_cur.execute('SELECT pg_current_wal_insert_lsn()') lsn_c = main_cur.fetchone()[0] - print('LSN after 400100 rows: ' + lsn_c) + log.info('LSN after 400100 rows: ' + lsn_c) # Create first read-only node at the point where only 100 rows were inserted pg_hundred = env.postgres.create_start("test_readonly_node_hundred", @@ -84,8 +86,6 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): assert cur.fetchone() == (1, ) # Create node at pre-initdb lsn - try: + with pytest.raises(Exception, match='extracting base backup failed'): + # compute node startup with invalid LSN should fail env.zenith_cli(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"]) - assert False, "compute node startup with invalid LSN should have failed" - except Exception: - print("Node creation with pre-initdb LSN failed (as expected)") diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py index 9b57d5acbe..1f0adc1a15 100644 --- a/test_runner/batch_others/test_snapfiles_gc.py +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -1,22 +1,13 @@ from contextlib import closing import psycopg2.extras import time +from fixtures.utils import print_gc_result from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") -def print_gc_result(row): - log.info("GC duration {elapsed} ms".format_map(row)) - log.info( - " REL total: {layer_relfiles_total}, needed_by_cutoff {layer_relfiles_needed_by_cutoff}, needed_by_branches: {layer_relfiles_needed_by_branches}, not_updated: {layer_relfiles_not_updated}, needed_as_tombstone {layer_relfiles_needed_as_tombstone}, removed: {layer_relfiles_removed}, dropped: {layer_relfiles_dropped}" - .format_map(row)) - log.info( - " NONREL total: {layer_nonrelfiles_total}, needed_by_cutoff {layer_nonrelfiles_needed_by_cutoff}, needed_by_branches: {layer_nonrelfiles_needed_by_branches}, not_updated: {layer_nonrelfiles_not_updated}, needed_as_tombstone {layer_nonrelfiles_needed_as_tombstone}, removed: {layer_nonrelfiles_removed}, dropped: {layer_nonrelfiles_dropped}" - .format_map(row)) - - # # Test Garbage Collection of old layer files # diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 51005e3c48..236c225bfb 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -69,3 +69,13 @@ def lsn_from_hex(lsn_hex: str) -> int: """ Convert lsn from hex notation to int. """ l, r = lsn_hex.split('/') return (int(l, 16) << 32) + int(r, 16) + + +def print_gc_result(row): + log.info("GC duration {elapsed} ms".format_map(row)) + log.info( + " REL total: {layer_relfiles_total}, needed_by_cutoff {layer_relfiles_needed_by_cutoff}, needed_by_branches: {layer_relfiles_needed_by_branches}, not_updated: {layer_relfiles_not_updated}, needed_as_tombstone {layer_relfiles_needed_as_tombstone}, removed: {layer_relfiles_removed}, dropped: {layer_relfiles_dropped}" + .format_map(row)) + log.info( + " NONREL total: {layer_nonrelfiles_total}, needed_by_cutoff {layer_nonrelfiles_needed_by_cutoff}, needed_by_branches: {layer_nonrelfiles_needed_by_branches}, not_updated: {layer_nonrelfiles_not_updated}, needed_as_tombstone {layer_nonrelfiles_needed_as_tombstone}, removed: {layer_nonrelfiles_removed}, dropped: {layer_nonrelfiles_dropped}" + .format_map(row)) diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index ffe49e9f38..c09d8c67ce 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -203,6 +203,12 @@ impl AtomicLsn { } } +impl From for AtomicLsn { + fn from(lsn: Lsn) -> Self { + Self::new(lsn.0) + } +} + /// Pair of LSN's pointing to the end of the last valid record and previous one #[derive(Debug, Clone, Copy)] pub struct RecordLsn {