Prohibit branch creation at lsn that was already garbage collected.

This introduces new timeline field latest_gc_cutoff. It is updated
before each gc iteration. New check is added to branch_timelines to
prevent branch creation with start point less than latest_gc_cutoff.
Also this adds a check to get_page_at_lsn which asserts that lsn at
which the page is requested was not garbage collected. This check
currently is triggered for readonly nodes which are pinned to specific
lsn and because they are not tracked in pageserver garbage collection
can remove data that still might be referenced. This is a bug and will
be fixed separately.
This commit is contained in:
Dmitry Rodionov
2021-11-12 17:18:56 +03:00
committed by Dmitry Rodionov
parent 298bc588f9
commit 44111e3ba3
9 changed files with 187 additions and 33 deletions

View File

@@ -146,7 +146,7 @@ impl Repository for LayeredRepository {
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0));
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0));
Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?;
let timeline = LayeredTimeline::new(
@@ -168,7 +168,19 @@ impl Repository for LayeredRepository {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> {
let src_timeline = self.get_timeline(src)?;
// We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
// concurrently removes data that is needed by the new timeline.
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = self.get_timeline_locked(src, &mut timelines)?;
let latest_gc_cutoff = src_timeline.latest_gc_cutoff.load();
ensure!(
start_lsn >= latest_gc_cutoff,
"Branch start LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
start_lsn,
latest_gc_cutoff,
);
let RecordLsn {
last: src_last,
@@ -185,11 +197,20 @@ impl Repository for LayeredRepository {
// Create the metadata file, noting the ancestor of the new timeline.
// There is initially no data in it, but all the read-calls know to look
// into the ancestor.
let metadata = TimelineMetadata::new(start_lsn, dst_prev, Some(src), start_lsn);
let metadata = TimelineMetadata::new(
start_lsn,
dst_prev,
Some(src),
start_lsn,
src_timeline.latest_gc_cutoff.load(),
);
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?;
Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?;
info!("branched timeline {} from {} at {}", dst, src, start_lsn);
info!(
"branched timeline {} from {} at {} latest_gc_cutoff {}",
dst, src, start_lsn, latest_gc_cutoff
);
Ok(())
}
@@ -552,6 +573,9 @@ pub struct LayeredTimeline {
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
latest_gc_cutoff: AtomicLsn,
}
/// Public interface functions
@@ -591,6 +615,13 @@ impl Timeline for LayeredTimeline {
);
}
debug_assert!(lsn <= self.get_last_record_lsn());
let latest_gc_cutoff = self.latest_gc_cutoff.load();
// error instead of assert to simplify testing
ensure!(
lsn >= latest_gc_cutoff,
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, latest_gc_cutoff
);
let seg = SegmentTag::from_blknum(rel, blknum);
@@ -857,6 +888,8 @@ impl LayeredTimeline {
upload_relishes,
write_lock: Mutex::new(()),
latest_gc_cutoff: AtomicLsn::from(metadata.latest_gc_cutoff()),
};
Ok(timeline)
}
@@ -1237,6 +1270,7 @@ impl LayeredTimeline {
ondisk_prev_record_lsn,
ancestor_timelineid,
self.ancestor_lsn,
self.latest_gc_cutoff.load(),
);
LayeredRepository::save_metadata(
@@ -1334,6 +1368,10 @@ impl LayeredTimeline {
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
// We need to ensure that no one branches at a point before latest_gc_cutoff.
// See branch_timeline() for details.
self.latest_gc_cutoff.store(cutoff);
info!("GC starting");
debug!("retain_lsns: {:?}", retain_lsns);
@@ -1495,11 +1533,12 @@ impl LayeredTimeline {
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
"garbage collecting {} {}-{} is_dropped: {} is_incremental: {}",
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
l.is_dropped(),
l.is_incremental(),
);
layers_to_remove.push(Arc::clone(&l));
}
@@ -1582,12 +1621,19 @@ impl LayeredTimeline {
let mut layer_ref = layer;
let mut curr_lsn = lsn;
loop {
match layer_ref.get_page_reconstruct_data(
blknum,
curr_lsn,
cached_lsn_opt,
&mut data,
)? {
let result = layer_ref
.get_page_reconstruct_data(blknum, curr_lsn, cached_lsn_opt, &mut data)
.with_context(|| {
format!(
"Failed to get reconstruct data {} {:?} {} {} {:?}",
layer_ref.get_seg_tag(),
layer_ref.filename(),
blknum,
curr_lsn,
cached_lsn_opt,
)
})?;
match result {
PageReconstructResult::Complete => break,
PageReconstructResult::Continue(cont_lsn) => {
// Fetch base image / more WAL from the returned predecessor layer

View File

@@ -42,6 +42,7 @@ pub struct TimelineMetadata {
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff: Lsn,
}
/// Points to a place in pageserver's local directory,
@@ -61,12 +62,14 @@ impl TimelineMetadata {
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff: Lsn,
) -> Self {
Self {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
ancestor_lsn,
latest_gc_cutoff,
}
}
@@ -121,6 +124,10 @@ impl TimelineMetadata {
pub fn ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
pub fn latest_gc_cutoff(&self) -> Lsn {
self.latest_gc_cutoff
}
}
/// This module is for direct conversion of metadata to bytes and back.
@@ -139,6 +146,7 @@ mod serialize {
prev_record_lsn: &'a Option<Lsn>,
ancestor_timeline: &'a Option<ZTimelineId>,
ancestor_lsn: &'a Lsn,
latest_gc_cutoff: &'a Lsn,
}
impl<'a> From<&'a TimelineMetadata> for SeTimelineMetadata<'a> {
@@ -148,6 +156,7 @@ mod serialize {
prev_record_lsn: &other.prev_record_lsn,
ancestor_timeline: &other.ancestor_timeline,
ancestor_lsn: &other.ancestor_lsn,
latest_gc_cutoff: &other.latest_gc_cutoff,
}
}
}
@@ -158,6 +167,7 @@ mod serialize {
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff: Lsn,
}
impl From<DeTimelineMetadata> for TimelineMetadata {
@@ -167,6 +177,7 @@ mod serialize {
prev_record_lsn: other.prev_record_lsn,
ancestor_timeline: other.ancestor_timeline,
ancestor_lsn: other.ancestor_lsn,
latest_gc_cutoff: other.latest_gc_cutoff,
}
}
}
@@ -185,6 +196,7 @@ mod tests {
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff: Lsn(0),
};
let metadata_bytes = original_metadata

View File

@@ -1562,6 +1562,6 @@ mod tests {
}
fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0))
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0))
}
}

View File

@@ -769,6 +769,70 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Arc<dyn Timeline>) -> Result<()> {
{
let writer = tline.writer();
// Create a relation on the timeline
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
writer.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
writer.advance_last_record_lsn(Lsn(0x30));
}
// materialize layers, lsn 20-30
tline.checkpoint(CheckpointConfig::Forced)?;
{
let writer = tline.writer();
writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
writer.put_page_image(TESTREL_A, 0, Lsn(0x50), TEST_IMG("foo blk 0 at 5"))?;
writer.advance_last_record_lsn(Lsn(0x50));
}
// materialize layers, lsn 40-50
tline.checkpoint(CheckpointConfig::Forced)
}
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
let repo =
RepoHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
make_some_layers(&tline)?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
Ok(_) => panic!("branching should have failed"),
Err(err) => assert!(err
.to_string()
.contains("we might've already garbage collected needed data")),
}
Ok(())
}
#[test]
fn test_prohibit_get_page_at_lsn_for_garbage_collected_pages() -> Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_page_at_lsn_for_garbage_collected_pages")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
make_some_layers(&tline)?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
match tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)) {
Ok(_) => panic!("request for page should have failed"),
Err(err) => assert!(err
.to_string()
.contains("tried to request a page version that was garbage collected")),
}
Ok(())
}
#[test]
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";

View File

@@ -1,6 +1,11 @@
import subprocess
from fixtures.zenith_fixtures import ZenithEnv
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnv
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -19,8 +24,16 @@ def test_branch_behind(zenith_simple_env: ZenithEnv):
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW zenith.zenith_timeline")
timeline = main_cur.fetchone()[0]
# Create table, and insert the first 100 rows
main_cur.execute('CREATE TABLE foo (t text)')
# keep some early lsn to test branch creation on out of date lsn
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
gced_lsn = main_cur.fetchone()[0]
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
@@ -94,3 +107,15 @@ def test_branch_behind(zenith_simple_env: ZenithEnv):
# FIXME: assert false, "branch with invalid LSN should have failed"
except subprocess.CalledProcessError:
log.info("Branch creation with pre-initdb LSN failed (as expected)")
# check that we cannot create branch based on garbage collected data
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with pytest.raises(Exception, match="(we might've already garbage collected needed data)"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.zenith_cli(["branch", "test_branch_create_fail", f"test_branch_behind@{gced_lsn}"])

View File

@@ -1,4 +1,5 @@
import subprocess
import pytest
from fixtures.log_helper import log
from fixtures.zenith_fixtures import ZenithEnv
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -15,13 +16,14 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
env.zenith_cli(["branch", "test_readonly_node", "empty"])
pgmain = env.postgres.create_start('test_readonly_node')
print("postgres is running on 'test_readonly_node' branch")
log.info("postgres is running on 'test_readonly_node' branch")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
# Create table, and insert the first 100 rows
main_cur.execute('CREATE TABLE foo (t text)')
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
@@ -29,7 +31,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_a = main_cur.fetchone()[0]
print('LSN after 100 rows: ' + lsn_a)
log.info('LSN after 100 rows: ' + lsn_a)
# Insert some more rows. (This generates enough WAL to fill a few segments.)
main_cur.execute('''
@@ -39,7 +41,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
print('LSN after 200100 rows: ' + lsn_b)
log.info('LSN after 200100 rows: ' + lsn_b)
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute('''
@@ -50,7 +52,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
print('LSN after 400100 rows: ' + lsn_c)
log.info('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
pg_hundred = env.postgres.create_start("test_readonly_node_hundred",
@@ -84,8 +86,6 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
assert cur.fetchone() == (1, )
# Create node at pre-initdb lsn
try:
with pytest.raises(Exception, match='extracting base backup failed'):
# compute node startup with invalid LSN should fail
env.zenith_cli(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"])
assert False, "compute node startup with invalid LSN should have failed"
except Exception:
print("Node creation with pre-initdb LSN failed (as expected)")

View File

@@ -1,22 +1,13 @@
from contextlib import closing
import psycopg2.extras
import time
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
def print_gc_result(row):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
" REL total: {layer_relfiles_total}, needed_by_cutoff {layer_relfiles_needed_by_cutoff}, needed_by_branches: {layer_relfiles_needed_by_branches}, not_updated: {layer_relfiles_not_updated}, needed_as_tombstone {layer_relfiles_needed_as_tombstone}, removed: {layer_relfiles_removed}, dropped: {layer_relfiles_dropped}"
.format_map(row))
log.info(
" NONREL total: {layer_nonrelfiles_total}, needed_by_cutoff {layer_nonrelfiles_needed_by_cutoff}, needed_by_branches: {layer_nonrelfiles_needed_by_branches}, not_updated: {layer_nonrelfiles_not_updated}, needed_as_tombstone {layer_nonrelfiles_needed_as_tombstone}, removed: {layer_nonrelfiles_removed}, dropped: {layer_nonrelfiles_dropped}"
.format_map(row))
#
# Test Garbage Collection of old layer files
#

View File

@@ -69,3 +69,13 @@ def lsn_from_hex(lsn_hex: str) -> int:
""" Convert lsn from hex notation to int. """
l, r = lsn_hex.split('/')
return (int(l, 16) << 32) + int(r, 16)
def print_gc_result(row):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
" REL total: {layer_relfiles_total}, needed_by_cutoff {layer_relfiles_needed_by_cutoff}, needed_by_branches: {layer_relfiles_needed_by_branches}, not_updated: {layer_relfiles_not_updated}, needed_as_tombstone {layer_relfiles_needed_as_tombstone}, removed: {layer_relfiles_removed}, dropped: {layer_relfiles_dropped}"
.format_map(row))
log.info(
" NONREL total: {layer_nonrelfiles_total}, needed_by_cutoff {layer_nonrelfiles_needed_by_cutoff}, needed_by_branches: {layer_nonrelfiles_needed_by_branches}, not_updated: {layer_nonrelfiles_not_updated}, needed_as_tombstone {layer_nonrelfiles_needed_as_tombstone}, removed: {layer_nonrelfiles_removed}, dropped: {layer_nonrelfiles_dropped}"
.format_map(row))

View File

@@ -203,6 +203,12 @@ impl AtomicLsn {
}
}
impl From<Lsn> for AtomicLsn {
fn from(lsn: Lsn) -> Self {
Self::new(lsn.0)
}
}
/// Pair of LSN's pointing to the end of the last valid record and previous one
#[derive(Debug, Clone, Copy)]
pub struct RecordLsn {