diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index d770946580..80e4f145ef 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -50,7 +50,9 @@ use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, }; use crate::tenant::storage_layer::IoConcurrency; -use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery}; +use crate::tenant::timeline::{ + GetVectoredError, MissingKeyError, RelSizeCacheEntry, VersionedKeySpaceQuery, +}; /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. pub const MAX_AUX_FILE_DELTAS: usize = 1024; @@ -470,8 +472,26 @@ impl Timeline { )); } - if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { - return Ok(nblocks); + if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) { + match entry { + RelSizeCacheEntry::Present(nblocks) => { + return Ok(nblocks); + } + RelSizeCacheEntry::Truncated => { + let key = rel_size_to_key(tag); + return Err(PageReconstructError::MissingKey(Box::new( + MissingKeyError { + keyspace: KeySpace::single(key..key.next()), + shard: self.get_shard_identity().number, + query: None, + original_hwm_lsn: version.get_lsn(), + ancestor_lsn: None, + read_path: None, + backtrace: None, + }, + ))); + } + } } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) @@ -510,8 +530,15 @@ impl Timeline { } // first try to lookup relation in cache - if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { - return Ok(true); + if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) { + match entry { + RelSizeCacheEntry::Present(_) => { + return Ok(true); + } + RelSizeCacheEntry::Truncated => { + return Ok(false); + } + } } // then check if the database was already initialized. // get_rel_exists can be called before dbdir is created. @@ -1330,12 +1357,12 @@ impl Timeline { } /// Get cached size of relation if it not updated after specified LSN - pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { + pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { let rel_size_cache = self.rel_size_cache.read().unwrap(); - if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) { + if let Some((cached_lsn, entry)) = rel_size_cache.map.get(tag) { if lsn >= *cached_lsn { RELSIZE_CACHE_HITS.inc(); - return Some(*nblocks); + return Some(*entry); } RELSIZE_CACHE_MISSES_OLD.inc(); } @@ -1359,11 +1386,11 @@ impl Timeline { hash_map::Entry::Occupied(mut entry) => { let cached_lsn = entry.get_mut(); if lsn >= cached_lsn.0 { - *cached_lsn = (lsn, nblocks); + *cached_lsn = (lsn, RelSizeCacheEntry::Present(nblocks)); } } hash_map::Entry::Vacant(entry) => { - entry.insert((lsn, nblocks)); + entry.insert((lsn, RelSizeCacheEntry::Present(nblocks))); RELSIZE_CACHE_ENTRIES.inc(); } } @@ -1372,15 +1399,23 @@ impl Timeline { /// Store cached relation size pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() { + if rel_size_cache + .map + .insert(tag, (lsn, RelSizeCacheEntry::Present(nblocks))) + .is_none() + { RELSIZE_CACHE_ENTRIES.inc(); } } /// Remove cached relation size - pub fn remove_cached_rel_size(&self, tag: &RelTag) { + pub fn remove_cached_rel_size(&self, tag: RelTag, lsn: Lsn) { let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.remove(tag).is_some() { + if rel_size_cache + .map + .insert(tag, (lsn, RelSizeCacheEntry::Truncated)) + .is_some() + { RELSIZE_CACHE_ENTRIES.dec(); } } @@ -1585,7 +1620,9 @@ impl DatadirModification<'_> { // check the cache too. This is because eagerly checking the cache results in // less work overall and 10% better performance. It's more work on cache miss // but cache miss is rare. - if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) { + if let Some(RelSizeCacheEntry::Present(nblocks)) = + self.tline.get_cached_rel_size(&rel, self.get_lsn()) + { Ok(nblocks) } else if !self .tline @@ -2172,7 +2209,7 @@ impl DatadirModification<'_> { self.pending_nblocks -= old_size as i64; // Remove entry from relation size cache - self.tline.remove_cached_rel_size(&rel_tag); + self.tline.remove_cached_rel_size(rel_tag, self.lsn); // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage. self.delete(rel_key_range(rel_tag)); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d7f5958128..cc948a56a0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -204,7 +204,13 @@ pub struct TimelineResources { /// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`]. pub(crate) struct RelSizeCache { pub(crate) complete_as_of: Lsn, - pub(crate) map: HashMap, + pub(crate) map: HashMap, +} + +#[derive(Debug, Copy, Clone)] +pub enum RelSizeCacheEntry { + Present(BlockNumber), + Truncated, } pub struct Timeline { @@ -690,15 +696,15 @@ impl std::fmt::Display for ReadPath { #[derive(thiserror::Error)] pub struct MissingKeyError { - keyspace: KeySpace, - shard: ShardNumber, - query: Option, + pub keyspace: KeySpace, + pub shard: ShardNumber, + pub query: Option, // This is largest request LSN from the get page request batch - original_hwm_lsn: Lsn, - ancestor_lsn: Option, + pub original_hwm_lsn: Lsn, + pub ancestor_lsn: Option, /// Debug information about the read path if there's an error - read_path: Option, - backtrace: Option, + pub read_path: Option, + pub backtrace: Option, } impl MissingKeyError { diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index e2a22cc769..c88bc7aace 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -27,8 +27,9 @@ from contextlib import closing import psycopg2 import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup from fixtures.pg_version import PgVersion from fixtures.utils import query_scalar, skip_on_postgres, wait_until @@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): with secondary.cursor() as secondary_cur: secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (n_restarts,) + + +def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + sql = """ +CREATE TABLE CHAR_TBL(f1 char(4)); +CREATE TABLE FLOAT8_TBL(f1 float8); +CREATE TABLE INT2_TBL(f1 int2); +CREATE TABLE INT4_TBL(f1 int4); +CREATE TABLE INT8_TBL(q1 int8, q2 int8); +CREATE TABLE POINT_TBL(f1 point); +CREATE TABLE TEXT_TBL (f1 text); +CREATE TABLE VARCHAR_TBL(f1 varchar(4)); +CREATE TABLE onek (unique1 int4); +CREATE TABLE onek2 AS SELECT * FROM onek; +CREATE TABLE tenk1 (unique1 int4); +CREATE TABLE tenk2 AS SELECT * FROM tenk1; +CREATE TABLE person (name text, age int4,location point); +CREATE TABLE emp (salary int4, manager name) INHERITS (person); +CREATE TABLE student (gpa float8) INHERITS (person); +CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student); +CREATE TABLE road (name text,thepath path); +CREATE TABLE ihighway () INHERITS (road); +CREATE TABLE shighway(surface text) INHERITS (road); +CREATE TABLE BOOLTBL3 (d text, b bool, o int); +CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool); +DROP TABLE BOOLTBL3; +DROP TABLE BOOLTBL4; +CREATE TABLE ceil_floor_round (a numeric); +DROP TABLE ceil_floor_round; +CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8); +DROP TABLE width_bucket_test; +CREATE TABLE num_input_test (n1 numeric); +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +CREATE TABLE snapshot_test (nr integer, snap txid_snapshot); +CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field); +CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field); +TRUNCATE guid1; +DROP TABLE guid1; +DROP TABLE guid2 CASCADE; +CREATE TABLE numrange_test (nr NUMRANGE); +CREATE INDEX numrange_test_btree on numrange_test(nr); +CREATE TABLE numrange_test2(nr numrange); +CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr); +INSERT INTO numrange_test2 VALUES('[, 5)'); +CREATE TABLE textrange_test (tr text); +CREATE INDEX textrange_test_btree on textrange_test(tr); +CREATE TABLE test_range_gist(ir int4range); +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +DROP INDEX test_range_gist_idx; +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +CREATE TABLE test_range_spgist(ir int4range); +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +DROP INDEX test_range_spgist_idx; +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +CREATE TABLE test_range_elem(i int4); +CREATE INDEX test_range_elem_idx on test_range_elem (i); +CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10)); +DROP TABLE test_range_elem; +CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&)); +CREATE TABLE f_test(f text, i int); +CREATE TABLE i8r_array (f1 int, f2 text); +CREATE TYPE arrayrange as range (subtype=int4[]); +CREATE TYPE two_ints as (a int, b int); +DROP TYPE two_ints cascade; +CREATE TABLE text_support_test (t text); +CREATE TABLE TEMP_FLOAT (f1 FLOAT8); +CREATE TABLE TEMP_INT4 (f1 INT4); +CREATE TABLE TEMP_INT2 (f1 INT2); +CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8); +CREATE TABLE POLYGON_TBL(f1 polygon); +CREATE TABLE quad_poly_tbl (id int, p polygon); +INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y; +CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl; +CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl; +""" + + with endpoint.cursor() as cur: + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + for line in sql.split("\n"): + if len(line.strip()) == 0 or line.startswith("--"): + continue + cur.execute(line) + + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + cur.execute("VACUUM FULL pg_class;") + + for ep in env.endpoints.endpoints: + log.info(f"{ep.endpoint_id} / {ep.pg_port}") + pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"] + env_vars = { + "PGPORT": str(ep.pg_port), + "PGUSER": endpoint.default_options["user"], + "PGHOST": endpoint.default_options["host"], + } + pg_bin.run_capture(pg_dump_command, env=env_vars)