Add test for Garbage Collection.

This expose a command in in page server to run GC immediately on a given
timeline. It's just for testing purposes.
This commit is contained in:
Heikki Linnakangas
2021-06-28 17:07:28 +03:00
parent a31bba19b0
commit ec44f4b299
4 changed files with 178 additions and 18 deletions

View File

@@ -532,29 +532,15 @@ impl Timeline for ObjectTimeline {
last_relation_size: None,
}))
}
}
///
/// Result of performing GC
///
#[derive(Default)]
struct GcResult {
n_relations: u64,
truncated: u64,
deleted: u64,
dropped: u64,
elapsed: Duration,
}
impl ObjectTimeline {
fn gc_iteration(&self, horizon: u64) -> Result<()> {
fn gc_iteration(&self, horizon: u64) -> Result<GcResult> {
let last_lsn = self.get_last_valid_lsn();
let mut result: GcResult = Default::default();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(horizon) {
// WAL is large enough to perform GC
let now = Instant::now();
let mut result: GcResult = Default::default();
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
@@ -625,9 +611,11 @@ impl ObjectTimeline {
result.elapsed, &result.n_relations, &result.truncated, &result.deleted, &result.dropped);
self.obj_store.compact();
}
Ok(())
Ok(result)
}
}
impl ObjectTimeline {
fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes> {
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.

View File

@@ -21,7 +21,9 @@ use std::thread;
use std::{io, net::TcpStream};
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC};
use zenith_utils::pq_proto::{
BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC,
};
use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup;
@@ -500,6 +502,70 @@ impl postgres_backend::Handler for PageServerHandler {
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?;
pgb.write_message_noflush(&BeMessage::DataRow(&[Some(system_id.as_bytes())]))?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with(b"do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.
let query_str = std::str::from_utf8(&query_string)?;
let mut it = query_str.split(' ');
it.next().unwrap();
let timeline_id: ZTimelineId = it
.next()
.ok_or_else(|| anyhow!("missing timeline id"))?
.parse()?;
let timeline = page_cache::get_repository().get_timeline(timeline_id)?;
let horizon: u64 = it
.next()
.unwrap_or(&self.conf.gc_horizon.to_string())
.parse()?;
let result = timeline.gc_iteration(horizon)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor {
name: b"n_relations",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"truncated",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"deleted",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"dropped",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"elapsed",
typoid: 20,
typlen: 8,
..Default::default()
},
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(&result.n_relations.to_string().as_bytes()),
Some(&result.truncated.to_string().as_bytes()),
Some(&result.deleted.to_string().as_bytes()),
Some(&result.dropped.to_string().as_bytes()),
Some(&result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
bail!("unknown command");
}

View File

@@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use zenith_utils::lsn::Lsn;
///
@@ -28,6 +29,18 @@ pub trait Repository: Send + Sync {
//fn get_stats(&self) -> RepositoryStats;
}
///
/// Result of performing GC
///
#[derive(Default)]
pub struct GcResult {
pub n_relations: u64,
pub truncated: u64,
pub deleted: u64,
pub dropped: u64,
pub elapsed: Duration,
}
pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------
// Public GET functions
@@ -94,6 +107,13 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
/// Perform one garbage collection iteration.
/// Garbage collection is periodically performed by GC thread,
/// but it can be explicitly requested through page server API.
///
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
fn gc_iteration(&self, horizon: u64) -> Result<GcResult>;
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {

View File

@@ -0,0 +1,86 @@
from contextlib import closing
import psycopg2.extras
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test Garbage Collection of old page versions.
#
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_gc(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_gc", "empty"])
pg = postgres.create_start('test_gc')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory = psycopg2.extras.DictCursor) as pscur:
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
# Create a test table
cur.execute("CREATE TABLE foo(x integer)")
# Run GC, to clear out any old page versions left behind in the catalogs by
# the CREATE TABLE command. We want to have a clean slate with no garbage
# before running the actual tests below, otherwise the counts won't match
# what we expect.
print("Running GC before test")
pscur.execute(f"do_gc {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
# remember the number of relations
n_relations = row['n_relations']
assert n_relations > 0
# Insert a row. The first insert will also create a metadata entry for the
# relation, with size == 1 block. Hence, bump up the expected relation count.
n_relations += 1;
print("Inserting one row and running GC")
cur.execute("INSERT INTO foo VALUES (1)")
pscur.execute(f"do_gc {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
assert row['dropped'] == 0
assert row['truncated'] == 1
assert row['deleted'] == 1
# Insert two more rows and run GC.
print("Inserting two more rows and running GC")
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
assert row['dropped'] == 0
assert row['truncated'] == 1
assert row['deleted'] == 2
# Insert one more row. It creates one more page version, but doesn't affect the
# relation size.
print("Inserting one more row")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
assert row['dropped'] == 0
assert row['truncated'] == 1
assert row['deleted'] == 1
# Run GC again, with no changes in the database. Should not remove anything.
pscur.execute(f"do_gc {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
assert row['dropped'] == 0
assert row['truncated'] == 0
assert row['deleted'] == 0