Fix pitr_interval check in GC:

Use timestamp->LSN mapping instead of file modification time.
Fix 'latest_gc_cutoff_lsn' - set it to the minimum of pitr_cutoff and gc_cutoff.
Add new test: test_pitr_gc
This commit is contained in:
Anastasia Lubennikova
2022-05-12 20:53:40 +03:00
parent bf899a57d9
commit aa7c601eca
3 changed files with 131 additions and 25 deletions

View File

@@ -74,6 +74,7 @@ pub mod metadata;
mod par_fsync;
mod storage_layer;
use crate::pgdatadir_mapping::LsnForTimestamp;
use delta_layer::{DeltaLayer, DeltaLayerWriter};
use ephemeral_file::is_ephemeral_file;
use filename::{DeltaFileName, ImageFileName};
@@ -81,6 +82,7 @@ use image_layer::{ImageLayer, ImageLayerWriter};
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use layer_map::SearchResult;
use postgres_ffi::xlog_utils::to_pg_timestamp;
use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
// re-export this function so that page_cache.rs can use it.
@@ -2118,11 +2120,49 @@ impl LayeredTimeline {
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
// Calculate pitr cutoff point.
// By default, we don't want to GC anything.
let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn();
if let Ok(timeline) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
// If we don't have enough data to convert to LSN,
// play safe and don't remove any layers.
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
match timeline.find_lsn_for_timestamp(pitr_timestamp)? {
LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn,
LsnForTimestamp::Future(lsn) => {
debug!("future({})", lsn);
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
}
}
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
}
} else {
// We don't have local timeline in mocked cargo tests.
// So, just ignore pitr_interval setting in this case.
pitr_cutoff_lsn = cutoff;
}
let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn);
// Nothing to GC. Return early.
if *self.get_latest_gc_cutoff_lsn() == new_gc_cutoff {
result.elapsed = now.elapsed()?;
return Ok(result);
}
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered();
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
// See branch_timeline() for details.
*self.latest_gc_cutoff_lsn.write().unwrap() = cutoff;
*self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff;
info!("GC starting");
@@ -2162,30 +2202,18 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. It is newer than PiTR interval?
// We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
// TODO A possible "bloat" issue still persists here.
// If modification time changes because of layer upload/download, we will keep these files
// longer than necessary.
// https://github.com/neondatabase/neon/issues/1554
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
debug!(
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
l.filename().display(),
last_modified,
pitr
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 2. It is newer than PiTR cutoff point?
if l.get_lsn_range().end > pitr_cutoff_lsn {
debug!(
"keeping {} because it's newer than pitr_cutoff_lsn {}",
l.filename().display(),
pitr_cutoff_lsn
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that
// might be referenced by child branches forever.

View File

@@ -0,0 +1,77 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnvBuilder
#
# Check pitr_interval GC behavior.
# Insert some data, run GC and create a branch in the past.
#
def test_pitr_gc(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
# Set pitr interval such that we need to keep the data
zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1day', gc_horizon = 0}"
env = zenith_env_builder.init_start()
pgmain = env.postgres.create_start('main')
log.info("postgres is running on 'main' branch")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW zenith.zenith_timeline")
timeline = main_cur.fetchone()[0]
# Create table
main_cur.execute('CREATE TABLE foo (t text)')
for i in range(10000):
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space';
''')
if i == 99:
# keep some early lsn to test branch creation after GC
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
lsn_a = res[0]
xid_a = res[1]
log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}')
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
debug_lsn = res[0]
debug_xid = res[1]
log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}')
# run GC
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute(f"compact {env.initial_tenant.hex} {timeline}")
# perform agressive GC. Data still should be kept because of the PITR setting.
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
# Branch at the point where only 100 rows were inserted
# It must have been preserved by PITR setting
env.zenith_cli.create_branch('test_pitr_gc_hundred', 'main', ancestor_start_lsn=lsn_a)
pg_hundred = env.postgres.create_start('test_pitr_gc_hundred')
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (10000, )

View File

@@ -75,7 +75,8 @@ def lsn_from_hex(lsn_hex: str) -> int:
def print_gc_result(row):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
.format_map(row))