diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 79f068a47b..e64052c73d 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -351,7 +351,7 @@ pub struct TenantConfigToml { /// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into /// `index_part.json`, and it cannot be reversed. - pub rel_size_v2_enabled: Option, + pub rel_size_v2_enabled: bool, // gc-compaction related configs /// Enable automatic gc-compaction trigger on this tenant. @@ -633,7 +633,7 @@ impl Default for TenantConfigToml { lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: true, wal_receiver_protocol_override: None, - rel_size_v2_enabled: None, + rel_size_v2_enabled: false, gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED, gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB, gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index dbd45da314..b88a2e46a1 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -1,10 +1,12 @@ use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; +use bytes::Bytes; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::Oid; use postgres_ffi::RepOriginId; use serde::{Deserialize, Serialize}; use std::{fmt, ops::Range}; +use utils::const_assert; use crate::reltag::{BlockNumber, RelTag, SlruKind}; @@ -49,6 +51,64 @@ pub const AUX_KEY_PREFIX: u8 = 0x62; /// The key prefix of ReplOrigin keys. pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63; +/// The key prefix of db directory keys. +pub const DB_DIR_KEY_PREFIX: u8 = 0x64; + +/// The key prefix of rel directory keys. +pub const REL_DIR_KEY_PREFIX: u8 = 0x65; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum RelDirExists { + Exists, + Removed, +} + +#[derive(Debug)] +pub struct DecodeError; + +impl fmt::Display for DecodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "invalid marker") + } +} + +impl std::error::Error for DecodeError {} + +impl RelDirExists { + /// The value of the rel directory keys that indicates the existence of a relation. + const REL_EXISTS_MARKER: Bytes = Bytes::from_static(b"r"); + + pub fn encode(&self) -> Bytes { + match self { + Self::Exists => Self::REL_EXISTS_MARKER.clone(), + Self::Removed => SPARSE_TOMBSTONE_MARKER.clone(), + } + } + + pub fn decode_option(data: Option>) -> Result { + match data { + Some(marker) if marker.as_ref() == Self::REL_EXISTS_MARKER => Ok(Self::Exists), + // Any other marker is invalid + Some(_) => Err(DecodeError), + None => Ok(Self::Removed), + } + } + + pub fn decode(data: impl AsRef<[u8]>) -> Result { + let data = data.as_ref(); + if data == Self::REL_EXISTS_MARKER { + Ok(Self::Exists) + } else if data == SPARSE_TOMBSTONE_MARKER { + Ok(Self::Removed) + } else { + Err(DecodeError) + } + } +} + +/// A tombstone in the sparse keyspace, which is an empty buffer. +pub const SPARSE_TOMBSTONE_MARKER: Bytes = Bytes::from_static(b""); + /// Check if the key falls in the range of metadata keys. pub const fn is_metadata_key_slice(key: &[u8]) -> bool { key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX @@ -110,6 +170,24 @@ impl Key { } } + pub fn rel_dir_sparse_key_range() -> Range { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: REL_DIR_KEY_PREFIX + 1, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + } + } + /// This function checks more extensively what keys we can take on the write path. /// If a key beginning with 00 does not have a global/default tablespace OID, it /// will be rejected on the write path. @@ -440,6 +518,36 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key { } } +#[inline(always)] +pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: relnode, + field5: forknum, + field6: 1, + } +} + +pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range { + Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: REL_DIR_KEY_PREFIX, + field2: spcnode, + field3: dbnode, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX, + } // it's fine to exclude the last key b/c we only use field6 == 1 +} + #[inline(always)] pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { Key { @@ -734,9 +842,9 @@ impl Key { self.field1 == RELATION_SIZE_PREFIX } - pub fn sparse_non_inherited_keyspace() -> Range { + pub const fn sparse_non_inherited_keyspace() -> Range { // The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace - debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX); + const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX); Key { field1: AUX_KEY_PREFIX, field2: 0, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f2dca8befa..ae2762bd1e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -23,13 +23,14 @@ use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; use itertools::Itertools; -use pageserver_api::key::Key; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, - relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, - slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range, + slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, + twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY, + CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; +use pageserver_api::key::{rel_tag_sparse_key, Key}; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; @@ -490,12 +491,33 @@ impl Timeline { if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) { return Ok(false); } - // fetch directory listing + + // Read path: first read the new reldir keyspace. Early return if the relation exists. + // Otherwise, read the old reldir keyspace. + // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2. + + if self.get_rel_size_v2_enabled() { + // fetch directory listing (new) + let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum); + let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?) + .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; + let exists_v2 = buf == RelDirExists::Exists; + // Fast path: if the relation exists in the new format, return true. + // TODO: we should have a verification mode that checks both keyspaces + // to ensure the relation only exists in one of them. + if exists_v2 { + return Ok(true); + } + } + + // fetch directory listing (old) + let key = rel_dir_to_key(tag.spcnode, tag.dbnode); let buf = version.get(self, key, ctx).await?; let dir = RelDirectory::des(&buf)?; - Ok(dir.rels.contains(&(tag.relnode, tag.forknum))) + let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum)); + Ok(exists_v1) } /// Get a list of all existing relations in given tablespace and database. @@ -513,12 +535,12 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result, PageReconstructError> { - // fetch directory listing + // fetch directory listing (old) let key = rel_dir_to_key(spcnode, dbnode); let buf = version.get(self, key, ctx).await?; let dir = RelDirectory::des(&buf)?; - let rels: HashSet = + let rels_v1: HashSet = HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag { spcnode, dbnode, @@ -526,6 +548,46 @@ impl Timeline { forknum: *forknum, })); + if !self.get_rel_size_v2_enabled() { + return Ok(rels_v1); + } + + // scan directory listing (new), merge with the old results + let key_range = rel_tag_sparse_key_range(spcnode, dbnode); + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| PageReconstructError::Cancelled)?, + ); + let results = self + .scan( + KeySpace::single(key_range), + version.get_lsn(), + ctx, + io_concurrency, + ) + .await?; + let mut rels = rels_v1; + for (key, val) in results { + let val = RelDirExists::decode(&val?) + .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; + assert_eq!(key.field6, 1); + assert_eq!(key.field2, spcnode); + assert_eq!(key.field3, dbnode); + let tag = RelTag { + spcnode, + dbnode, + relnode: key.field4, + forknum: key.field5, + }; + if val == RelDirExists::Removed { + debug_assert!(!rels.contains(&tag), "removed reltag in v2"); + continue; + } + let did_not_contain = rels.insert(tag); + debug_assert!(did_not_contain, "duplicate reltag in v2"); + } Ok(rels) } @@ -1144,7 +1206,11 @@ impl Timeline { let dense_keyspace = result.to_keyspace(); let sparse_keyspace = SparseKeySpace(KeySpace { - ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()], + ranges: vec![ + Key::metadata_aux_key_range(), + repl_origin_key_range(), + Key::rel_dir_sparse_key_range(), + ], }); if cfg!(debug_assertions) { @@ -1274,12 +1340,22 @@ pub struct DatadirModification<'a> { /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. - pending_directory_entries: Vec<(DirectoryKind, usize)>, + pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>, /// An **approximation** of how many metadata bytes will be written to the EphemeralFile. pending_metadata_bytes: usize, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MetricsUpdate { + /// Set the metrics to this value + Set(u64), + /// Increment the metrics by this value + Add(u64), + /// Decrement the metrics by this value + Sub(u64), +} + impl DatadirModification<'_> { // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we @@ -1359,7 +1435,8 @@ impl DatadirModification<'_> { let buf = DbDirectory::ser(&DbDirectory { dbdirs: HashMap::new(), })?; - self.pending_directory_entries.push((DirectoryKind::Db, 0)); + self.pending_directory_entries + .push((DirectoryKind::Db, MetricsUpdate::Set(0))); self.put(DBDIR_KEY, Value::Image(buf.into())); let buf = if self.tline.pg_version >= 17 { @@ -1372,7 +1449,7 @@ impl DatadirModification<'_> { }) }?; self.pending_directory_entries - .push((DirectoryKind::TwoPhase, 0)); + .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0))); self.put(TWOPHASEDIR_KEY, Value::Image(buf.into())); let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into(); @@ -1382,17 +1459,23 @@ impl DatadirModification<'_> { // harmless but they'd just be dropped on later compaction. if self.tline.tenant_shard_id.is_shard_zero() { self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone()); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::Clog), + MetricsUpdate::Set(0), + )); self.put( slru_dir_to_key(SlruKind::MultiXactMembers), empty_dir.clone(), ); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::Clog), + MetricsUpdate::Set(0), + )); self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir); - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0)); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), + MetricsUpdate::Set(0), + )); } Ok(()) @@ -1658,10 +1741,16 @@ impl DatadirModification<'_> { } if r.is_none() { // Create RelDirectory + // TODO: if we have fully migrated to v2, no need to create this directory let buf = RelDirectory::ser(&RelDirectory { rels: HashSet::new(), })?; - self.pending_directory_entries.push((DirectoryKind::Rel, 0)); + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); + if self.tline.get_rel_size_v2_enabled() { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + } self.put( rel_dir_to_key(spcnode, dbnode), Value::Image(Bytes::from(buf)), @@ -1685,8 +1774,10 @@ impl DatadirModification<'_> { if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?) } else { let xid = xid as u32; @@ -1694,8 +1785,10 @@ impl DatadirModification<'_> { if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectory::ser(&dir)?) }; self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf)); @@ -1744,8 +1837,10 @@ impl DatadirModification<'_> { let mut dir = DbDirectory::des(&buf)?; if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() { let buf = DbDirectory::ser(&dir)?; - self.pending_directory_entries - .push((DirectoryKind::Db, dir.dbdirs.len())); + self.pending_directory_entries.push(( + DirectoryKind::Db, + MetricsUpdate::Set(dir.dbdirs.len() as u64), + )); self.put(DBDIR_KEY, Value::Image(buf.into())); } else { warn!( @@ -1778,39 +1873,85 @@ impl DatadirModification<'_> { // tablespace. Create the reldir entry for it if so. let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) .context("deserialize db")?; - let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); - let mut rel_dir = + + let dbdir_exists = if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) { // Didn't exist. Update dbdir e.insert(false); let buf = DbDirectory::ser(&dbdir).context("serialize db")?; - self.pending_directory_entries - .push((DirectoryKind::Db, dbdir.dbdirs.len())); + self.pending_directory_entries.push(( + DirectoryKind::Db, + MetricsUpdate::Set(dbdir.dbdirs.len() as u64), + )); self.put(DBDIR_KEY, Value::Image(buf.into())); - - // and create the RelDirectory - RelDirectory::default() + false } else { - // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) - .context("deserialize db")? + true }; + let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); + let mut rel_dir = if !dbdir_exists { + // Create the RelDirectory + RelDirectory::default() + } else { + // reldir already exists, fetch it + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? + }; + // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { return Err(RelationError::AlreadyExists); } - self.pending_directory_entries - .push((DirectoryKind::Rel, rel_dir.rels.len())); - - self.put( - rel_dir_key, - Value::Image(Bytes::from( - RelDirectory::ser(&rel_dir).context("serialize")?, - )), - ); - + if self.tline.get_rel_size_v2_enabled() { + let sparse_rel_dir_key = + rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum); + // check if the rel_dir_key exists in v2 + let val = self + .sparse_get(sparse_rel_dir_key, ctx) + .await + .map_err(|e| RelationError::Other(e.into()))?; + let val = RelDirExists::decode_option(val) + .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?; + if val == RelDirExists::Exists { + return Err(RelationError::AlreadyExists); + } + self.put( + sparse_rel_dir_key, + Value::Image(RelDirExists::Exists.encode()), + ); + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation. + // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there + // will be key not found errors if we don't create an empty one for rel_size_v2. + self.put( + rel_dir_key, + Value::Image(Bytes::from( + RelDirectory::ser(&RelDirectory::default()).context("serialize")?, + )), + ); + } + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Add(1))); + } else { + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))) + } + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Add(1))); + self.put( + rel_dir_key, + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), + ); + } // Put size let size_key = rel_size_to_key(rel); let buf = nblocks.to_le_bytes(); @@ -1896,9 +2037,34 @@ impl DatadirModification<'_> { let mut dirty = false; for rel_tag in rel_tags { - if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) { + let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Sub(1))); dirty = true; + true + } else if self.tline.get_rel_size_v2_enabled() { + // The rel is not found in the old reldir key, so we need to check the new sparse keyspace. + // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion + // logic). + let key = + rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum); + let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?) + .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?; + if val == RelDirExists::Exists { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1))); + // put tombstone + self.put(key, Value::Image(RelDirExists::Removed.encode())); + // no need to set dirty to true + true + } else { + false + } + } else { + false + }; + if found { // update logical size let size_key = rel_size_to_key(rel_tag); let old_size = self.get(size_key, ctx).await?.get_u32_le(); @@ -1914,8 +2080,6 @@ impl DatadirModification<'_> { if dirty { self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); - self.pending_directory_entries - .push((DirectoryKind::Rel, dir.rels.len())); } } @@ -1939,8 +2103,10 @@ impl DatadirModification<'_> { if !dir.segments.insert(segno) { anyhow::bail!("slru segment {kind:?}/{segno} already exists"); } - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(kind), + MetricsUpdate::Set(dir.segments.len() as u64), + )); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -1987,8 +2153,10 @@ impl DatadirModification<'_> { if !dir.segments.remove(&segno) { warn!("slru segment {:?}/{} does not exist", kind, segno); } - self.pending_directory_entries - .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); + self.pending_directory_entries.push(( + DirectoryKind::SlruSegment(kind), + MetricsUpdate::Set(dir.segments.len() as u64), + )); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -2020,8 +2188,10 @@ impl DatadirModification<'_> { if !dir.xids.remove(&xid) { warn!("twophase file for xid {} does not exist", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?) } else { let xid: u32 = u32::try_from(xid)?; @@ -2030,8 +2200,10 @@ impl DatadirModification<'_> { if !dir.xids.remove(&xid) { warn!("twophase file for xid {} does not exist", xid); } - self.pending_directory_entries - .push((DirectoryKind::TwoPhase, dir.xids.len())); + self.pending_directory_entries.push(( + DirectoryKind::TwoPhase, + MetricsUpdate::Set(dir.xids.len() as u64), + )); Bytes::from(TwoPhaseDirectory::ser(&dir)?) }; self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf)); @@ -2147,7 +2319,7 @@ impl DatadirModification<'_> { } for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { - writer.update_directory_entries_count(kind, count as u64); + writer.update_directory_entries_count(kind, count); } Ok(()) @@ -2233,7 +2405,7 @@ impl DatadirModification<'_> { } for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { - writer.update_directory_entries_count(kind, count as u64); + writer.update_directory_entries_count(kind, count); } self.pending_metadata_bytes = 0; @@ -2297,6 +2469,22 @@ impl DatadirModification<'_> { self.tline.get(key, lsn, ctx).await } + /// Get a key from the sparse keyspace. Automatically converts the missing key error + /// and the empty value into None. + async fn sparse_get( + &self, + key: Key, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let val = self.get(key, ctx).await; + match val { + Ok(val) if val.is_empty() => Ok(None), + Ok(val) => Ok(Some(val)), + Err(PageReconstructError::MissingKey(_)) => Ok(None), + Err(e) => Err(e), + } + } + fn put(&mut self, key: Key, val: Value) { if Self::is_data_key(&key) { self.put_data(key.to_compact(), val) @@ -2379,6 +2567,23 @@ impl Version<'_> { } } + /// Get a key from the sparse keyspace. Automatically converts the missing key error + /// and the empty value into None. + async fn sparse_get( + &self, + timeline: &Timeline, + key: Key, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let val = self.get(timeline, key, ctx).await; + match val { + Ok(val) if val.is_empty() => Ok(None), + Ok(val) => Ok(Some(val)), + Err(PageReconstructError::MissingKey(_)) => Ok(None), + Err(e) => Err(e), + } + } + fn get_lsn(&self) -> Lsn { match self { Version::Lsn(lsn) => *lsn, @@ -2438,6 +2643,7 @@ pub(crate) enum DirectoryKind { Rel, AuxFiles, SlruSegment(SlruKind), + RelV2, } impl DirectoryKind { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dec585ff65..5a2c5c0c46 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3924,6 +3924,13 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } + pub fn get_rel_size_v2_enabled(&self) -> bool { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .rel_size_v2_enabled + .unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled) + } + pub fn get_compaction_upper_limit(&self) -> usize { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf @@ -5640,7 +5647,7 @@ pub(crate) mod harness { lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), timeline_offloading: Some(tenant_conf.timeline_offloading), wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override, - rel_size_v2_enabled: tenant_conf.rel_size_v2_enabled, + rel_size_v2_enabled: Some(tenant_conf.rel_size_v2_enabled), gc_compaction_enabled: Some(tenant_conf.gc_compaction_enabled), gc_compaction_initial_threshold_kb: Some( tenant_conf.gc_compaction_initial_threshold_kb, diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 7fdfd736ad..c6bcfdf2fb 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -485,7 +485,9 @@ impl TenantConfOpt { wal_receiver_protocol_override: self .wal_receiver_protocol_override .or(global_conf.wal_receiver_protocol_override), - rel_size_v2_enabled: self.rel_size_v2_enabled.or(global_conf.rel_size_v2_enabled), + rel_size_v2_enabled: self + .rel_size_v2_enabled + .unwrap_or(global_conf.rel_size_v2_enabled), gc_compaction_enabled: self .gc_compaction_enabled .unwrap_or(global_conf.gc_compaction_enabled), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 782b7d88b0..277dce7761 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -117,7 +117,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL; use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::metrics::{TimelineMetrics, DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL}; -use crate::pgdatadir_mapping::CalculateLogicalSizeError; +use crate::pgdatadir_mapping::{CalculateLogicalSizeError, MetricsUpdate}; use crate::tenant::config::TenantConfOpt; use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIndex; @@ -327,6 +327,7 @@ pub struct Timeline { // in `crate::page_service` writes these metrics. pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline, + directory_metrics_inited: [AtomicBool; DirectoryKind::KINDS_NUM], directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM], /// Ensures layers aren't frozen by checkpointer between @@ -2355,6 +2356,14 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } + pub(crate) fn get_rel_size_v2_enabled(&self) -> bool { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .rel_size_v2_enabled + .unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled) + } + fn get_compaction_upper_limit(&self) -> usize { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -2664,6 +2673,7 @@ impl Timeline { ), directory_metrics: array::from_fn(|_| AtomicU64::new(0)), + directory_metrics_inited: array::from_fn(|_| AtomicBool::new(false)), flush_loop_state: Mutex::new(FlushLoopState::NotStarted), @@ -3430,8 +3440,42 @@ impl Timeline { } } - pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) { - self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed); + pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) { + // TODO: this directory metrics is not correct -- we could have multiple reldirs in the system + // for each of the database, but we only store one value, and therefore each pgdirmodification + // would overwrite the previous value if they modify different databases. + + match count { + MetricsUpdate::Set(count) => { + self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed); + self.directory_metrics_inited[kind.offset()].store(true, AtomicOrdering::Relaxed); + } + MetricsUpdate::Add(count) => { + // TODO: these operations are not atomic; but we only have one writer to the metrics, so + // it's fine. + if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) { + // The metrics has been initialized with `MetricsUpdate::Set` before, so we can add/sub + // the value reliably. + self.directory_metrics[kind.offset()].fetch_add(count, AtomicOrdering::Relaxed); + } + // Otherwise, ignore this update + } + MetricsUpdate::Sub(count) => { + // TODO: these operations are not atomic; but we only have one writer to the metrics, so + // it's fine. + if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) { + // The metrics has been initialized with `MetricsUpdate::Set` before. + // The operation could overflow so we need to normalize the value. + let prev_val = + self.directory_metrics[kind.offset()].load(AtomicOrdering::Relaxed); + let res = prev_val.saturating_sub(count); + self.directory_metrics[kind.offset()].store(res, AtomicOrdering::Relaxed); + } + // Otherwise, ignore this update + } + }; + + // TODO: remove this, there's no place in the code that updates this aux metrics. let aux_metric = self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed); @@ -3649,7 +3693,9 @@ impl Timeline { // space. If that's not the case, we had at least one key encounter a gap in the image layer // and stop the search as a result of that. let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace); - // Do not fire missing key error for sparse keys. + // Do not fire missing key error and end early for sparse keys. Note that we hava already removed + // non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of + // figuring out what is the inherited key range and do a fine-grained pruning. removed.remove_overlapping_with(&KeySpace { ranges: vec![SPARSE_RANGE], }); diff --git a/test_runner/regress/test_relations.py b/test_runner/regress/test_relations.py new file mode 100644 index 0000000000..3e29c92a96 --- /dev/null +++ b/test_runner/regress/test_relations.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from fixtures.neon_fixtures import ( + NeonEnvBuilder, +) + + +def test_pageserver_reldir_v2( + neon_env_builder: NeonEnvBuilder, +): + env = neon_env_builder.init_start( + initial_tenant_conf={ + "rel_size_v2_enabled": "false", + } + ) + + endpoint = env.endpoints.create_start("main") + # Create a relation in v1 + endpoint.safe_psql("CREATE TABLE foo1 (id INTEGER PRIMARY KEY, val text)") + endpoint.safe_psql("CREATE TABLE foo2 (id INTEGER PRIMARY KEY, val text)") + + # Switch to v2 + env.pageserver.http_client().update_tenant_config( + env.initial_tenant, + { + "rel_size_v2_enabled": True, + }, + ) + + # Check if both relations are still accessible + endpoint.safe_psql("SELECT * FROM foo1") + endpoint.safe_psql("SELECT * FROM foo2") + + # Restart the endpoint + endpoint.stop() + endpoint.start() + + # Check if both relations are still accessible again after restart + endpoint.safe_psql("SELECT * FROM foo1") + endpoint.safe_psql("SELECT * FROM foo2") + + # Create a relation in v2 + endpoint.safe_psql("CREATE TABLE foo3 (id INTEGER PRIMARY KEY, val text)") + # Delete a relation in v1 + endpoint.safe_psql("DROP TABLE foo1") + + # Check if both relations are still accessible + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("SELECT * FROM foo3") + + # Restart the endpoint + endpoint.stop() + # This will acquire a basebackup, which lists all relations. + endpoint.start() + + # Check if both relations are still accessible + endpoint.safe_psql("DROP TABLE IF EXISTS foo1") + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("SELECT * FROM foo3") + + endpoint.safe_psql("DROP TABLE foo3") + endpoint.stop() + endpoint.start() + + # Check if relations are still accessible + endpoint.safe_psql("DROP TABLE IF EXISTS foo1") + endpoint.safe_psql("SELECT * FROM foo2") + endpoint.safe_psql("DROP TABLE IF EXISTS foo3") diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index b4c968b217..afe444f227 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -481,7 +481,8 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder): counts = timeline_detail["directory_entries_counts"] assert counts log.info(f"directory counts: {counts}") - assert counts[2] > COUNT_AT_LEAST_EXPECTED + # We need to add up reldir v1 + v2 counts + assert counts[2] + counts[7] > COUNT_AT_LEAST_EXPECTED def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):