diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 1a8af822bb..bb7dba43f7 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -532,29 +532,15 @@ impl Timeline for ObjectTimeline { last_relation_size: None, })) } -} -/// -/// Result of performing GC -/// -#[derive(Default)] -struct GcResult { - n_relations: u64, - truncated: u64, - deleted: u64, - dropped: u64, - elapsed: Duration, -} - -impl ObjectTimeline { - fn gc_iteration(&self, horizon: u64) -> Result<()> { + fn gc_iteration(&self, horizon: u64) -> Result { let last_lsn = self.get_last_valid_lsn(); + let mut result: GcResult = Default::default(); // checked_sub() returns None on overflow. if let Some(horizon) = last_lsn.checked_sub(horizon) { // WAL is large enough to perform GC let now = Instant::now(); - let mut result: GcResult = Default::default(); // Iterate through all relations for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { @@ -625,9 +611,11 @@ impl ObjectTimeline { result.elapsed, &result.n_relations, &result.truncated, &result.deleted, &result.dropped); self.obj_store.compact(); } - Ok(()) + Ok(result) } +} +impl ObjectTimeline { fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result { // Look up the page entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bd5a1841e2..59b7a8bcab 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -21,7 +21,9 @@ use std::thread; use std::{io, net::TcpStream}; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC}; +use zenith_utils::pq_proto::{ + BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC, +}; use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; @@ -500,6 +502,70 @@ impl postgres_backend::Handler for PageServerHandler { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?; pgb.write_message_noflush(&BeMessage::DataRow(&[Some(system_id.as_bytes())]))?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with(b"do_gc ") { + // Run GC immediately on given timeline. + // FIXME: This is just for tests. See test_runner/batch_others/test_gc.py. + // This probably should require special authentication or a global flag to + // enable, I don't think we want to or need to allow regular clients to invoke + // GC. + let query_str = std::str::from_utf8(&query_string)?; + + let mut it = query_str.split(' '); + it.next().unwrap(); + + let timeline_id: ZTimelineId = it + .next() + .ok_or_else(|| anyhow!("missing timeline id"))? + .parse()?; + let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + + let horizon: u64 = it + .next() + .unwrap_or(&self.conf.gc_horizon.to_string()) + .parse()?; + + let result = timeline.gc_iteration(horizon)?; + + pgb.write_message_noflush(&BeMessage::RowDescription(&[ + RowDescriptor { + name: b"n_relations", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"truncated", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"deleted", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"dropped", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"elapsed", + typoid: 20, + typlen: 8, + ..Default::default() + }, + ]))? + .write_message_noflush(&BeMessage::DataRow(&[ + Some(&result.n_relations.to_string().as_bytes()), + Some(&result.truncated.to_string().as_bytes()), + Some(&result.deleted.to_string().as_bytes()), + Some(&result.dropped.to_string().as_bytes()), + Some(&result.elapsed.as_millis().to_string().as_bytes()), + ]))? + .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { bail!("unknown command"); } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index b1a1c91277..3bf725bbf9 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::fmt; use std::sync::Arc; +use std::time::Duration; use zenith_utils::lsn::Lsn; /// @@ -28,6 +29,18 @@ pub trait Repository: Send + Sync { //fn get_stats(&self) -> RepositoryStats; } +/// +/// Result of performing GC +/// +#[derive(Default)] +pub struct GcResult { + pub n_relations: u64, + pub truncated: u64, + pub deleted: u64, + pub dropped: u64, + pub elapsed: Duration, +} + pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ // Public GET functions @@ -94,6 +107,13 @@ pub trait Timeline: Send + Sync { /// Relation size is increased implicitly and decreased with Truncate updates. // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; + + /// Perform one garbage collection iteration. + /// Garbage collection is periodically performed by GC thread, + /// but it can be explicitly requested through page server API. + /// + /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). + fn gc_iteration(&self, horizon: u64) -> Result; } pub trait History: Iterator> { diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py new file mode 100644 index 0000000000..e9235bb697 --- /dev/null +++ b/test_runner/batch_others/test_gc.py @@ -0,0 +1,86 @@ +from contextlib import closing +import psycopg2.extras + +pytest_plugins = ("fixtures.zenith_fixtures") + +# +# Test Garbage Collection of old page versions. +# +# This test is pretty tightly coupled with the current implementation of page version storage +# and garbage collection in object_repository.rs. +# +def test_gc(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run(["branch", "test_gc", "empty"]) + pg = postgres.create_start('test_gc') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + with closing(pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory = psycopg2.extras.DictCursor) as pscur: + + # Get the timeline ID of our branch. We need it for the 'do_gc' command + cur.execute("SHOW zenith.zenith_timeline") + timeline = cur.fetchone()[0] + + # Create a test table + cur.execute("CREATE TABLE foo(x integer)") + + # Run GC, to clear out any old page versions left behind in the catalogs by + # the CREATE TABLE command. We want to have a clean slate with no garbage + # before running the actual tests below, otherwise the counts won't match + # what we expect. + print("Running GC before test") + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + # remember the number of relations + n_relations = row['n_relations'] + assert n_relations > 0 + + # Insert a row. The first insert will also create a metadata entry for the + # relation, with size == 1 block. Hence, bump up the expected relation count. + n_relations += 1; + print("Inserting one row and running GC") + cur.execute("INSERT INTO foo VALUES (1)") + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + assert row['n_relations'] == n_relations + assert row['dropped'] == 0 + assert row['truncated'] == 1 + assert row['deleted'] == 1 + + # Insert two more rows and run GC. + print("Inserting two more rows and running GC") + cur.execute("INSERT INTO foo VALUES (2)") + cur.execute("INSERT INTO foo VALUES (3)") + + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + assert row['n_relations'] == n_relations + assert row['dropped'] == 0 + assert row['truncated'] == 1 + assert row['deleted'] == 2 + + # Insert one more row. It creates one more page version, but doesn't affect the + # relation size. + print("Inserting one more row") + cur.execute("INSERT INTO foo VALUES (3)") + + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + assert row['n_relations'] == n_relations + assert row['dropped'] == 0 + assert row['truncated'] == 1 + assert row['deleted'] == 1 + + # Run GC again, with no changes in the database. Should not remove anything. + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + assert row['n_relations'] == n_relations + assert row['dropped'] == 0 + assert row['truncated'] == 0 + assert row['deleted'] == 0