From ae091c6913066ad6f5ad9ef5a3115fe2ff7d7597 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:31:54 -0500 Subject: [PATCH] feat(pageserver): store reldir in sparse keyspace (#10593) ## Problem Part of https://github.com/neondatabase/neon/issues/9516 ## Summary of changes This patch adds the support for storing reldir in the sparse keyspace. All logic are guarded with the `rel_size_v2_enabled` flag, so if it's set to false, the code path is exactly the same as what's currently in prod. Note that we did not persist the `rel_size_v2_enabled` flag and the logic around it will be implemented in the next patch. (i.e., what if we enabled it, restart the pageserver, and then it gets set to false? we should still read from v2 using the rel_size_v2_migration_status in the index_part). The persistence logic I'll implement in the next patch will disallow switching from v2->v1 via config item. I also refactored the metrics so that it can work with the new reldir store. However, this metric is not correctly computed for reldirs (see the comments) before. With the refactor, the value will be computed only when we have an initial value for the reldir size. The refactor keeps the incorrectness of the computation when there are more than 1 database. For the tests, we currently run all the tests with v2, and I'll set it to false and add some v2-specific tests before merging, probably also v1->v2 migration tests. --------- Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/config.rs | 4 +- libs/pageserver_api/src/key.rs | 112 ++++++++- pageserver/src/pgdatadir_mapping.rs | 322 +++++++++++++++++++++----- pageserver/src/tenant.rs | 9 +- pageserver/src/tenant/config.rs | 4 +- pageserver/src/tenant/timeline.rs | 54 ++++- test_runner/regress/test_relations.py | 68 ++++++ test_runner/regress/test_tenants.py | 3 +- 8 files changed, 507 insertions(+), 69 deletions(-) create mode 100644 test_runner/regress/test_relations.py diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 79f068a47b..e64052c73d 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -351,7 +351,7 @@ pub struct TenantConfigToml { /// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into /// `index_part.json`, and it cannot be reversed. - pub rel_size_v2_enabled: Option, + pub rel_size_v2_enabled: bool, // gc-compaction related configs /// Enable automatic gc-compaction trigger on this tenant. @@ -633,7 +633,7 @@ impl Default for TenantConfigToml { lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: true, wal_receiver_protocol_override: None, - rel_size_v2_enabled: None, + rel_size_v2_enabled: false, gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED, gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB, gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index dbd45da314..b88a2e46a1 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -1,10 +1,12 @@ use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; +use bytes::Bytes; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::Oid; use postgres_ffi::RepOriginId; use serde::{Deserialize, Serialize}; use std::{fmt, ops::Range}; +use utils::const_assert; use crate::reltag::{BlockNumber, RelTag, SlruKind}; @@ -49,6 +51,64 @@ pub const AUX_KEY_PREFIX: u8 = 0x62; /// The key prefix of ReplOrigin keys. pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63; +/// The key prefix of db directory keys. +pub const DB_DIR_KEY_PREFIX: u8 = 0x64; + +/// The key prefix of rel directory keys. +pub const REL_DIR_KEY_PREFIX: u8 = 0x65; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum RelDirExists { + Exists, + Removed, +} + +#[derive(Debug)] +pub struct DecodeError; + +impl fmt::Display for DecodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "invalid marker") + } +} + +impl std::error::Error for DecodeError {} + +impl RelDirExists { + /// The value of the rel directory keys that indicates the existence of a relation. + const REL_EXISTS_MARKER: Bytes = Bytes::from_static(b"r"); + + pub fn encode(&self) -> Bytes { + match self { + Self::Exists => Self::REL_EXISTS_MARKER.clone(), + Self::Removed => SPARSE_TOMBSTONE_MARKER.clone(), + } + } + + pub fn decode_option(data: Option>) -> Result { + match data { + Some(marker) if marker.as_ref() == Self::REL_EXISTS_MARKER => Ok(Self::Exists), + // Any other marker is invalid + Some(_) => Err(DecodeError), + None => Ok(Self::Removed), + } + } + + pub fn decode(data: impl AsRef<[u8]>) -> Result { + let data = data.as_ref(); + if data == Self::REL_EXISTS_MARKER { + Ok(Self::Exists) + } else if data == SPARSE_TOMBSTONE_MARKER { + Ok(Self::Removed) + } else { + Err(DecodeError) + } + } +} + +/// A tombstone in the sparse keyspace, which is an empty buffer. +pub const SPARSE_TOMBSTONE_MARKER: Bytes = Bytes::from_static(b""); + /// Check if the key falls in the range of metadata keys. pub const fn is_metadata_key_slice(key: &[u8]) -> bool { key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX @@ -110,6 +170,24 @@ impl Key { } } + pub fn rel_dir_sparse_key_range() -> Range { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: REL_DIR_KEY_PREFIX + 1, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + } + } + /// This function checks more extensively what keys we can take on the write path. /// If a key beginning with 00 does not have a global/default tablespace OID, it /// will be rejected on the write path. @@ -440,6 +518,36 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key { } } +#[inline(always)] +pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: relnode, + field5: forknum, + field6: 1, + } +} + +pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX, + } // it's fine to exclude the last key b/c we only use field6 == 1 +} + #[inline(always)] pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { Key { @@ -734,9 +842,9 @@ impl Key { self.field1 == RELATION_SIZE_PREFIX } - pub fn sparse_non_inherited_keyspace() -> Range { + pub const fn sparse_non_inherited_keyspace() -> Range { // The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace - debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX); + const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX); Key { field1: AUX_KEY_PREFIX, field2: 0, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f2dca8befa..ae2762bd1e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -23,13 +23,14 @@ use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; use itertools::Itertools; -use pageserver_api::key::Key; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, - relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, - slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range, + slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, + twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY, + CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; +use pageserver_api::key::{rel_tag_sparse_key, Key}; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; @@ -490,12 +491,33 @@ impl Timeline { if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) { return Ok(false); } - // fetch directory listing + + // Read path: first read the new reldir keyspace. Early return if the relation exists. + // Otherwise, read the old reldir keyspace. + // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2. + + if self.get_rel_size_v2_enabled() { + // fetch directory listing (new) + let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum); + let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?) + .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; + let exists_v2 = buf == RelDirExists::Exists; + // Fast path: if the relation exists in the new format, return true. + // TODO: we should have a verification mode that checks both keyspaces + // to ensure the relation only exists in one of them. + if exists_v2 { + return Ok(true); + } + } + + // fetch directory listing (old) + let key = rel_dir_to_key(tag.spcnode, tag.dbnode); let buf = version.get(self, key, ctx).await?; let dir = RelDirectory::des(&buf)?; - Ok(dir.rels.contains(&(tag.relnode, tag.forknum))) + let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum)); + Ok(exists_v1) } /// Get a list of all existing relations in given tablespace and database. @@ -513,12 +535,12 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result, PageReconstructError> { - // fetch directory listing + // fetch directory listing (old) let key = rel_dir_to_key(spcnode, dbnode); let buf = version.get(self, key, ctx).await?; let dir = RelDirectory::des(&buf)?; - let rels: HashSet = + let rels_v1: HashSet = HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag { spcnode, dbnode, @@ -526,6 +548,46 @@ impl Timeline { forknum: *forknum, })); + if !self.get_rel_size_v2_enabled() { + return Ok(rels_v1); + } + + // scan directory listing (new), merge with the old results + let key_range = rel_tag_sparse_key_range(spcnode, dbnode); + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| PageReconstructError::Cancelled)?, + ); + let results = self + .scan( + KeySpace::single(key_range), + version.get_lsn(), + ctx, + io_concurrency, + ) + .await?; + let mut rels = rels_v1; + for (key, val) in results { + let val = RelDirExists::decode(&val?) + .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; + assert_eq!(key.field6, 1); + assert_eq!(key.field2, spcnode); + assert_eq!(key.field3, dbnode); + let tag = RelTag { + spcnode, + dbnode, + relnode: key.field4, + forknum: key.field5, + }; + if val == RelDirExists::Removed { + debug_assert!(!rels.contains(&tag), "removed reltag in v2"); + continue; + } + let did_not_contain = rels.insert(tag); + debug_assert!(did_not_contain, "duplicate reltag in v2"); + } Ok(rels) } @@ -1144,7 +1206,11 @@ impl Timeline { let dense_keyspace = result.to_keyspace(); let sparse_keyspace = SparseKeySpace(KeySpace { - ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()], + ranges: vec![ + Key::metadata_aux_key_range(), + repl_origin_key_range(), + Key::rel_dir_sparse_key_range(), + ], }); if cfg!(debug_assertions) { @@ -1274,12 +1340,22 @@ pub struct DatadirModification<'a> { /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. - pending_directory_entries: Vec<(DirectoryKind, usize)>, + pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>, /// An **approximation** of how many metadata bytes will be written to the EphemeralFile. pending_metadata_bytes: usize, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MetricsUpdate { + /// Set the metrics to this value + Set(u64), + /// Increment the metrics by this value + Add(u64), + /// Decrement the metrics by this value + Sub(u64), +} + impl DatadirModification<'_> { // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we @@ -1359,7 +1435,8 @@ impl DatadirModification<'_> { let buf = DbDirectory::ser(&DbDirectory { dbdirs: HashMap::new(), })?; - self.pending_directory_entries.push((DirectoryKind::Db, 0)); + self.pending_directory_entries + .push((DirectoryKind::Db, MetricsUpdate::Set(0))); self.put(DBDIR_KEY, Value::Image(buf.into())); let buf = if self.tline.pg_version >= 17 { @@ -1372,7 +1449,7 @@ impl DatadirModification<'_> { }) }?; self.pending_directory_entries - .push((DirectoryKind::TwoPhase, 0)); + .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0))); self.put(TWOPHASEDIR_KEY, Value::Image(buf.into())); let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into(); @@ -1382,17 +1459,23 @@ impl DatadirModification<'_> { // harmless but they'd just be dropped on later compaction. if self.tline.tenant_shard_id.is_shard_zero() { self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone()); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::Clog), + MetricsUpdate::Set(0), + )); self.put( slru_dir_to_key(SlruKind::MultiXactMembers), empty_dir.clone(), ); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::Clog), + MetricsUpdate::Set(0), + )); self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), + MetricsUpdate::Set(0), + )); } Ok(()) @@ -1658,10 +1741,16 @@ impl DatadirModification<'_> { } if r.is_none() { // Create RelDirectory + // TODO: if we have fully migrated to v2, no need to create this directory let buf = RelDirectory::ser(&RelDirectory { rels: HashSet::new(), })?; - self.pending_directory_entries.push((DirectoryKind::Rel, 0)); + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); + if self.tline.get_rel_size_v2_enabled() { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + } self.put( rel_dir_to_key(spcnode, dbnode), Value::Image(Bytes::from(buf)), @@ -1685,8 +1774,10 @@ impl DatadirModification<'_> { if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?) } else { let xid = xid as u32; @@ -1694,8 +1785,10 @@ impl DatadirModification<'_> { if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectory::ser(&dir)?) }; self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf)); @@ -1744,8 +1837,10 @@ impl DatadirModification<'_> { let mut dir = DbDirectory::des(&buf)?; if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() { let buf = DbDirectory::ser(&dir)?; - self.pending_directory_entries - .push((DirectoryKind::Db, dir.dbdirs.len())); + self.pending_directory_entries.push(( + DirectoryKind::Db, + MetricsUpdate::Set(dir.dbdirs.len() as u64), + )); self.put(DBDIR_KEY, Value::Image(buf.into())); } else { warn!( @@ -1778,39 +1873,85 @@ impl DatadirModification<'_> { // tablespace. Create the reldir entry for it if so. let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) .context("deserialize db")?; - let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); - let mut rel_dir = + + let dbdir_exists = if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) { // Didn't exist. Update dbdir e.insert(false); let buf = DbDirectory::ser(&dbdir).context("serialize db")?; - self.pending_directory_entries - .push((DirectoryKind::Db, dbdir.dbdirs.len())); + self.pending_directory_entries.push(( + DirectoryKind::Db, + MetricsUpdate::Set(dbdir.dbdirs.len() as u64), + )); self.put(DBDIR_KEY, Value::Image(buf.into())); - - // and create the RelDirectory - RelDirectory::default() + false } else { - // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) - .context("deserialize db")? + true }; + let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); + let mut rel_dir = if !dbdir_exists { + // Create the RelDirectory + RelDirectory::default() + } else { + // reldir already exists, fetch it + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? + }; + // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { return Err(RelationError::AlreadyExists); } - self.pending_directory_entries - .push((DirectoryKind::Rel, rel_dir.rels.len())); - - self.put( - rel_dir_key, - Value::Image(Bytes::from( - RelDirectory::ser(&rel_dir).context("serialize")?, - )), - ); - + if self.tline.get_rel_size_v2_enabled() { + let sparse_rel_dir_key = + rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum); + // check if the rel_dir_key exists in v2 + let val = self + .sparse_get(sparse_rel_dir_key, ctx) + .await + .map_err(|e| RelationError::Other(e.into()))?; + let val = RelDirExists::decode_option(val) + .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?; + if val == RelDirExists::Exists { + return Err(RelationError::AlreadyExists); + } + self.put( + sparse_rel_dir_key, + Value::Image(RelDirExists::Exists.encode()), + ); + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation. + // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there + // will be key not found errors if we don't create an empty one for rel_size_v2. + self.put( + rel_dir_key, + Value::Image(Bytes::from( + RelDirectory::ser(&RelDirectory::default()).context("serialize")?, + )), + ); + } + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Add(1))); + } else { + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))) + } + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Add(1))); + self.put( + rel_dir_key, + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), + ); + } // Put size let size_key = rel_size_to_key(rel); let buf = nblocks.to_le_bytes(); @@ -1896,9 +2037,34 @@ impl DatadirModification<'_> { let mut dirty = false; for rel_tag in rel_tags { - if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) { + let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Sub(1))); dirty = true; + true + } else if self.tline.get_rel_size_v2_enabled() { + // The rel is not found in the old reldir key, so we need to check the new sparse keyspace. + // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion + // logic). + let key = + rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum); + let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?) + .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?; + if val == RelDirExists::Exists { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1))); + // put tombstone + self.put(key, Value::Image(RelDirExists::Removed.encode())); + // no need to set dirty to true + true + } else { + false + } + } else { + false + }; + if found { // update logical size let size_key = rel_size_to_key(rel_tag); let old_size = self.get(size_key, ctx).await?.get_u32_le(); @@ -1914,8 +2080,6 @@ impl DatadirModification<'_> { if dirty { self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); - self.pending_directory_entries - .push((DirectoryKind::Rel, dir.rels.len())); } } @@ -1939,8 +2103,10 @@ impl DatadirModification<'_> { if !dir.segments.insert(segno) { anyhow::bail!("slru segment {kind:?}/{segno} already exists"); } - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(kind), + MetricsUpdate::Set(dir.segments.len() as u64), + )); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -1987,8 +2153,10 @@ impl DatadirModification<'_> { if !dir.segments.remove(&segno) { warn!("slru segment {:?}/{} does not exist", kind, segno); } - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(kind), + MetricsUpdate::Set(dir.segments.len() as u64), + )); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -2020,8 +2188,10 @@ impl DatadirModification<'_> { if !dir.xids.remove(&xid) { warn!("twophase file for xid {} does not exist", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?) } else { let xid: u32 = u32::try_from(xid)?; @@ -2030,8 +2200,10 @@ impl DatadirModification<'_> { if !dir.xids.remove(&xid) { warn!("twophase file for xid {} does not exist", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectory::ser(&dir)?) }; self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf)); @@ -2147,7 +2319,7 @@ impl DatadirModification<'_> { } for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { - writer.update_directory_entries_count(kind, count as u64); + writer.update_directory_entries_count(kind, count); } Ok(()) @@ -2233,7 +2405,7 @@ impl DatadirModification<'_> { } for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { - writer.update_directory_entries_count(kind, count as u64); + writer.update_directory_entries_count(kind, count); } self.pending_metadata_bytes = 0; @@ -2297,6 +2469,22 @@ impl DatadirModification<'_> { self.tline.get(key, lsn, ctx).await } + /// Get a key from the sparse keyspace. Automatically converts the missing key error + /// and the empty value into None. + async fn sparse_get( + &self, + key: Key, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let val = self.get(key, ctx).await; + match val { + Ok(val) if val.is_empty() => Ok(None), + Ok(val) => Ok(Some(val)), + Err(PageReconstructError::MissingKey(_)) => Ok(None), + Err(e) => Err(e), + } + } + fn put(&mut self, key: Key, val: Value) { if Self::is_data_key(&key) { self.put_data(key.to_compact(), val) @@ -2379,6 +2567,23 @@ impl Version<'_> { } } + /// Get a key from the sparse keyspace. Automatically converts the missing key error + /// and the empty value into None. + async fn sparse_get( + &self, + timeline: &Timeline, + key: Key, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let val = self.get(timeline, key, ctx).await; + match val { + Ok(val) if val.is_empty() => Ok(None), + Ok(val) => Ok(Some(val)), + Err(PageReconstructError::MissingKey(_)) => Ok(None), + Err(e) => Err(e), + } + } + fn get_lsn(&self) -> Lsn { match self { Version::Lsn(lsn) => *lsn, @@ -2438,6 +2643,7 @@ pub(crate) enum DirectoryKind { Rel, AuxFiles, SlruSegment(SlruKind), + RelV2, } impl DirectoryKind { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dec585ff65..5a2c5c0c46 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3924,6 +3924,13 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } + pub fn get_rel_size_v2_enabled(&self) -> bool { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .rel_size_v2_enabled + .unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled) + } + pub fn get_compaction_upper_limit(&self) -> usize { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf @@ -5640,7 +5647,7 @@ pub(crate) mod harness { lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), timeline_offloading: Some(tenant_conf.timeline_offloading), wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override, - rel_size_v2_enabled: tenant_conf.rel_size_v2_enabled, + rel_size_v2_enabled: Some(tenant_conf.rel_size_v2_enabled), gc_compaction_enabled: Some(tenant_conf.gc_compaction_enabled), gc_compaction_initial_threshold_kb: Some( tenant_conf.gc_compaction_initial_threshold_kb, diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 7fdfd736ad..c6bcfdf2fb 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -485,7 +485,9 @@ impl TenantConfOpt { wal_receiver_protocol_override: self .wal_receiver_protocol_override .or(global_conf.wal_receiver_protocol_override), - rel_size_v2_enabled: self.rel_size_v2_enabled.or(global_conf.rel_size_v2_enabled), + rel_size_v2_enabled: self + .rel_size_v2_enabled + .unwrap_or(global_conf.rel_size_v2_enabled), gc_compaction_enabled: self .gc_compaction_enabled .unwrap_or(global_conf.gc_compaction_enabled), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 782b7d88b0..277dce7761 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -117,7 +117,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL; use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::metrics::{TimelineMetrics, DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL}; -use crate::pgdatadir_mapping::CalculateLogicalSizeError; +use crate::pgdatadir_mapping::{CalculateLogicalSizeError, MetricsUpdate}; use crate::tenant::config::TenantConfOpt; use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIndex; @@ -327,6 +327,7 @@ pub struct Timeline { // in `crate::page_service` writes these metrics. pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline, + directory_metrics_inited: [AtomicBool; DirectoryKind::KINDS_NUM], directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM], /// Ensures layers aren't frozen by checkpointer between @@ -2355,6 +2356,14 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } + pub(crate) fn get_rel_size_v2_enabled(&self) -> bool { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .rel_size_v2_enabled + .unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled) + } + fn get_compaction_upper_limit(&self) -> usize { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -2664,6 +2673,7 @@ impl Timeline { ), directory_metrics: array::from_fn(|_| AtomicU64::new(0)), + directory_metrics_inited: array::from_fn(|_| AtomicBool::new(false)), flush_loop_state: Mutex::new(FlushLoopState::NotStarted), @@ -3430,8 +3440,42 @@ impl Timeline { } } - pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) { - self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed); + pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) { + // TODO: this directory metrics is not correct -- we could have multiple reldirs in the system + // for each of the database, but we only store one value, and therefore each pgdirmodification + // would overwrite the previous value if they modify different databases. + + match count { + MetricsUpdate::Set(count) => { + self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed); + self.directory_metrics_inited[kind.offset()].store(true, AtomicOrdering::Relaxed); + } + MetricsUpdate::Add(count) => { + // TODO: these operations are not atomic; but we only have one writer to the metrics, so + // it's fine. + if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) { + // The metrics has been initialized with `MetricsUpdate::Set` before, so we can add/sub + // the value reliably. + self.directory_metrics[kind.offset()].fetch_add(count, AtomicOrdering::Relaxed); + } + // Otherwise, ignore this update + } + MetricsUpdate::Sub(count) => { + // TODO: these operations are not atomic; but we only have one writer to the metrics, so + // it's fine. + if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) { + // The metrics has been initialized with `MetricsUpdate::Set` before. + // The operation could overflow so we need to normalize the value. + let prev_val = + self.directory_metrics[kind.offset()].load(AtomicOrdering::Relaxed); + let res = prev_val.saturating_sub(count); + self.directory_metrics[kind.offset()].store(res, AtomicOrdering::Relaxed); + } + // Otherwise, ignore this update + } + }; + + // TODO: remove this, there's no place in the code that updates this aux metrics. let aux_metric = self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed); @@ -3649,7 +3693,9 @@ impl Timeline { // space. If that's not the case, we had at least one key encounter a gap in the image layer // and stop the search as a result of that. let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace); - // Do not fire missing key error for sparse keys. + // Do not fire missing key error and end early for sparse keys. Note that we hava already removed + // non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of + // figuring out what is the inherited key range and do a fine-grained pruning. removed.remove_overlapping_with(&KeySpace { ranges: vec![SPARSE_RANGE], }); diff --git a/test_runner/regress/test_relations.py b/test_runner/regress/test_relations.py new file mode 100644 index 0000000000..3e29c92a96 --- /dev/null +++ b/test_runner/regress/test_relations.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from fixtures.neon_fixtures import ( + NeonEnvBuilder, +) + + +def test_pageserver_reldir_v2( + neon_env_builder: NeonEnvBuilder, +): + env = neon_env_builder.init_start( + initial_tenant_conf={ + "rel_size_v2_enabled": "false", + } + ) + + endpoint = env.endpoints.create_start("main") + # Create a relation in v1 + endpoint.safe_psql("CREATE TABLE foo1 (id INTEGER PRIMARY KEY, val text)") + endpoint.safe_psql("CREATE TABLE foo2 (id INTEGER PRIMARY KEY, val text)") + + # Switch to v2 + env.pageserver.http_client().update_tenant_config( + env.initial_tenant, + { + "rel_size_v2_enabled": True, + }, + ) + + # Check if both relations are still accessible + endpoint.safe_psql("SELECT * FROM foo1") + endpoint.safe_psql("SELECT * FROM foo2") + + # Restart the endpoint + endpoint.stop() + endpoint.start() + + # Check if both relations are still accessible again after restart + endpoint.safe_psql("SELECT * FROM foo1") + endpoint.safe_psql("SELECT * FROM foo2") + + # Create a relation in v2 + endpoint.safe_psql("CREATE TABLE foo3 (id INTEGER PRIMARY KEY, val text)") + # Delete a relation in v1 + endpoint.safe_psql("DROP TABLE foo1") + + # Check if both relations are still accessible + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("SELECT * FROM foo3") + + # Restart the endpoint + endpoint.stop() + # This will acquire a basebackup, which lists all relations. + endpoint.start() + + # Check if both relations are still accessible + endpoint.safe_psql("DROP TABLE IF EXISTS foo1") + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("SELECT * FROM foo3") + + endpoint.safe_psql("DROP TABLE foo3") + endpoint.stop() + endpoint.start() + + # Check if relations are still accessible + endpoint.safe_psql("DROP TABLE IF EXISTS foo1") + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("DROP TABLE IF EXISTS foo3") diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index b4c968b217..afe444f227 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -481,7 +481,8 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder): counts = timeline_detail["directory_entries_counts"] assert counts log.info(f"directory counts: {counts}") - assert counts[2] > COUNT_AT_LEAST_EXPECTED + # We need to add up reldir v1 + v2 counts + assert counts[2] + counts[7] > COUNT_AT_LEAST_EXPECTED def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):