diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index b02ab00a21..24f9bcff37 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -74,6 +74,7 @@ pub mod metadata; mod par_fsync; mod storage_layer; +use crate::pgdatadir_mapping::LsnForTimestamp; use delta_layer::{DeltaLayer, DeltaLayerWriter}; use ephemeral_file::is_ephemeral_file; use filename::{DeltaFileName, ImageFileName}; @@ -81,6 +82,7 @@ use image_layer::{ImageLayer, ImageLayerWriter}; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; use layer_map::SearchResult; +use postgres_ffi::xlog_utils::to_pg_timestamp; use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; // re-export this function so that page_cache.rs can use it. @@ -2118,11 +2120,49 @@ impl LayeredTimeline { let cutoff = gc_info.cutoff; let pitr = gc_info.pitr; + // Calculate pitr cutoff point. + // By default, we don't want to GC anything. + let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn(); + + if let Ok(timeline) = + tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) + { + // First, calculate pitr_cutoff_timestamp and then convert it to LSN. + // If we don't have enough data to convert to LSN, + // play safe and don't remove any layers. + if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) { + let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp); + + match timeline.find_lsn_for_timestamp(pitr_timestamp)? { + LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, + LsnForTimestamp::Future(lsn) => { + debug!("future({})", lsn); + } + LsnForTimestamp::Past(lsn) => { + debug!("past({})", lsn); + } + } + debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn) + } + } else { + // We don't have local timeline in mocked cargo tests. + // So, just ignore pitr_interval setting in this case. + pitr_cutoff_lsn = cutoff; + } + + let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn); + + // Nothing to GC. Return early. + if *self.get_latest_gc_cutoff_lsn() == new_gc_cutoff { + result.elapsed = now.elapsed()?; + return Ok(result); + } + let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. - *self.latest_gc_cutoff_lsn.write().unwrap() = cutoff; + *self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff; info!("GC starting"); @@ -2162,30 +2202,18 @@ impl LayeredTimeline { result.layers_needed_by_cutoff += 1; continue 'outer; } - // 2. It is newer than PiTR interval? - // We use modification time of layer file to estimate update time. - // This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill. - // It is not expected that users will need high precision here. And this estimation - // is conservative: modification time of file is always newer than actual time of version - // creation. So it is safe for users. - // TODO A possible "bloat" issue still persists here. - // If modification time changes because of layer upload/download, we will keep these files - // longer than necessary. - // https://github.com/neondatabase/neon/issues/1554 - // - if let Ok(metadata) = fs::metadata(&l.filename()) { - let last_modified = metadata.modified()?; - if now.duration_since(last_modified)? < pitr { - debug!( - "keeping {} because it's modification time {:?} is newer than PITR {:?}", - l.filename().display(), - last_modified, - pitr - ); - result.layers_needed_by_pitr += 1; - continue 'outer; - } + + // 2. It is newer than PiTR cutoff point? + if l.get_lsn_range().end > pitr_cutoff_lsn { + debug!( + "keeping {} because it's newer than pitr_cutoff_lsn {}", + l.filename().display(), + pitr_cutoff_lsn + ); + result.layers_needed_by_pitr += 1; + continue 'outer; } + // 3. Is it needed by a child branch? // NOTE With that wee would keep data that // might be referenced by child branches forever. diff --git a/test_runner/batch_others/test_pitr_gc.py b/test_runner/batch_others/test_pitr_gc.py new file mode 100644 index 0000000000..fe9159b4bb --- /dev/null +++ b/test_runner/batch_others/test_pitr_gc.py @@ -0,0 +1,77 @@ +import subprocess +from contextlib import closing + +import psycopg2.extras +import pytest +from fixtures.log_helper import log +from fixtures.utils import print_gc_result +from fixtures.zenith_fixtures import ZenithEnvBuilder + + +# +# Check pitr_interval GC behavior. +# Insert some data, run GC and create a branch in the past. +# +def test_pitr_gc(zenith_env_builder: ZenithEnvBuilder): + + zenith_env_builder.num_safekeepers = 1 + # Set pitr interval such that we need to keep the data + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1day', gc_horizon = 0}" + + env = zenith_env_builder.init_start() + pgmain = env.postgres.create_start('main') + log.info("postgres is running on 'main' branch") + + main_pg_conn = pgmain.connect() + main_cur = main_pg_conn.cursor() + + main_cur.execute("SHOW zenith.zenith_timeline") + timeline = main_cur.fetchone()[0] + + # Create table + main_cur.execute('CREATE TABLE foo (t text)') + + for i in range(10000): + main_cur.execute(''' + INSERT INTO foo + SELECT 'long string to consume some space'; + ''') + + if i == 99: + # keep some early lsn to test branch creation after GC + main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()') + res = main_cur.fetchone() + lsn_a = res[0] + xid_a = res[1] + log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}') + + main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()') + res = main_cur.fetchone() + debug_lsn = res[0] + debug_xid = res[1] + log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}') + + # run GC + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: + pscur.execute(f"compact {env.initial_tenant.hex} {timeline}") + # perform agressive GC. Data still should be kept because of the PITR setting. + pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row) + + # Branch at the point where only 100 rows were inserted + # It must have been preserved by PITR setting + env.zenith_cli.create_branch('test_pitr_gc_hundred', 'main', ancestor_start_lsn=lsn_a) + + pg_hundred = env.postgres.create_start('test_pitr_gc_hundred') + + # On the 'hundred' branch, we should see only 100 rows + hundred_pg_conn = pg_hundred.connect() + hundred_cur = hundred_pg_conn.cursor() + hundred_cur.execute('SELECT count(*) FROM foo') + assert hundred_cur.fetchone() == (100, ) + + # All the rows are visible on the main branch + main_cur.execute('SELECT count(*) FROM foo') + assert main_cur.fetchone() == (10000, ) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 98af511036..7b95e729d9 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -75,7 +75,8 @@ def lsn_from_hex(lsn_hex: str) -> int: def print_gc_result(row): log.info("GC duration {elapsed} ms".format_map(row)) log.info( - " total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}" + " total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}" + " needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}" .format_map(row))