diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 7c7c65fb70..230c1f46ea 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1500,6 +1500,7 @@ pub struct TimelineArchivalConfigRequest { #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct TimelinePatchIndexPartRequest { pub rel_size_migration: Option, + pub rel_size_migrated_at: Option, pub gc_compaction_last_completed_lsn: Option, pub applied_gc_cutoff_lsn: Option, #[serde(default)] @@ -1533,10 +1534,10 @@ pub enum RelSizeMigration { /// `None` is the same as `Some(RelSizeMigration::Legacy)`. Legacy, /// The tenant is migrating to the new rel_size format. Both old and new rel_size format are - /// persisted in the index part. The read path will read both formats and merge them. + /// persisted in the storage. The read path will read both formats and validate them. Migrating, /// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted - /// in the index part, and the read path will not read the old format. + /// in the storage, and the read path will not read the old format. Migrated, } @@ -1619,6 +1620,7 @@ pub struct TimelineInfo { /// The status of the rel_size migration. pub rel_size_migration: Option, + pub rel_size_migrated_at: Option, /// Whether the timeline is invisible in synthetic size calculations. pub is_invisible: Option, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3a08244d71..a0ea9b90dc 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -484,6 +484,8 @@ async fn build_timeline_info_common( *timeline.get_applied_gc_cutoff_lsn(), ); + let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_status(); + let info = TimelineInfo { tenant_id: timeline.tenant_shard_id, timeline_id: timeline.timeline_id, @@ -515,7 +517,8 @@ async fn build_timeline_info_common( state, is_archived: Some(is_archived), - rel_size_migration: Some(timeline.get_rel_size_v2_status()), + rel_size_migration: Some(rel_size_migration), + rel_size_migrated_at, is_invisible: Some(is_invisible), walreceiver_status, @@ -930,9 +933,16 @@ async fn timeline_patch_index_part_handler( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; + if request_data.rel_size_migration.is_none() && request_data.rel_size_migrated_at.is_some() + { + return Err(ApiError::BadRequest(anyhow!( + "updating rel_size_migrated_at without rel_size_migration is not allowed" + ))); + } + if let Some(rel_size_migration) = request_data.rel_size_migration { timeline - .update_rel_size_v2_status(rel_size_migration) + .update_rel_size_v2_status(rel_size_migration, request_data.rel_size_migrated_at) .map_err(ApiError::InternalServerError)?; } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 409cc2e3c5..5b674adbb3 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -57,7 +57,7 @@ pub async fn import_timeline_from_postgres_datadir( // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn) // Then fishing out pg_control would be unnecessary - let mut modification = tline.begin_modification(pgdata_lsn); + let mut modification = tline.begin_modification_for_import(pgdata_lsn); modification.init_empty()?; // Import all but pg_wal @@ -309,7 +309,7 @@ async fn import_wal( waldecoder.feed_bytes(&buf); let mut nrecords = 0; - let mut modification = tline.begin_modification(last_lsn); + let mut modification = tline.begin_modification_for_import(last_lsn); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let interpreted = InterpretedWalRecord::from_bytes_filtered( @@ -357,7 +357,7 @@ pub async fn import_basebackup_from_tar( ctx: &RequestContext, ) -> Result<()> { info!("importing base at {base_lsn}"); - let mut modification = tline.begin_modification(base_lsn); + let mut modification = tline.begin_modification_for_import(base_lsn); modification.init_empty()?; let mut pg_control: Option = None; @@ -457,7 +457,7 @@ pub async fn import_wal_from_tar( waldecoder.feed_bytes(&bytes[offset..]); - let mut modification = tline.begin_modification(last_lsn); + let mut modification = tline.begin_modification_for_import(last_lsn); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let interpreted = InterpretedWalRecord::from_bytes_filtered( diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c9f3184188..cedf77fb37 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -6,7 +6,7 @@ //! walingest.rs handles a few things like implicit relation creation and extension. //! Clarify that) //! -use std::collections::{HashMap, HashSet, hash_map}; +use std::collections::{BTreeSet, HashMap, HashSet, hash_map}; use std::ops::{ControlFlow, Range}; use std::sync::Arc; @@ -227,6 +227,25 @@ impl Timeline { pending_nblocks: 0, pending_directory_entries: Vec::new(), pending_metadata_bytes: 0, + is_importing_pgdata: false, + lsn, + } + } + + pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification + where + Self: Sized, + { + DatadirModification { + tline: self, + pending_lsns: Vec::new(), + pending_metadata_pages: HashMap::new(), + pending_data_batch: None, + pending_deletions: Vec::new(), + pending_nblocks: 0, + pending_directory_entries: Vec::new(), + pending_metadata_bytes: 0, + is_importing_pgdata: true, lsn, } } @@ -596,6 +615,50 @@ impl Timeline { self.get_rel_exists_in_reldir(tag, version, None, ctx).await } + async fn get_rel_exists_in_reldir_v1( + &self, + tag: RelTag, + version: Version<'_>, + deserialized_reldir_v1: Option<(Key, &RelDirectory)>, + ctx: &RequestContext, + ) -> Result { + let key = rel_dir_to_key(tag.spcnode, tag.dbnode); + if let Some((cached_key, dir)) = deserialized_reldir_v1 { + if cached_key == key { + return Ok(dir.rels.contains(&(tag.relnode, tag.forknum))); + } else if cfg!(test) || cfg!(feature = "testing") { + panic!("cached reldir key mismatch: {cached_key} != {key}"); + } else { + warn!("cached reldir key mismatch: {cached_key} != {key}"); + } + // Fallback to reading the directory from the datadir. + } + + let buf = version.get(self, key, ctx).await?; + + let dir = RelDirectory::des(&buf)?; + Ok(dir.rels.contains(&(tag.relnode, tag.forknum))) + } + + async fn get_rel_exists_in_reldir_v2( + &self, + tag: RelTag, + version: Version<'_>, + ctx: &RequestContext, + ) -> Result { + let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum); + let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err( + |_| { + PageReconstructError::Other(anyhow::anyhow!( + "invalid reldir key: decode failed, {}", + key + )) + }, + )?; + let exists_v2 = buf == RelDirExists::Exists; + Ok(exists_v2) + } + /// Does the relation exist? With a cached deserialized `RelDirectory`. /// /// There are some cases where the caller loops across all relations. In that specific case, @@ -627,45 +690,134 @@ impl Timeline { return Ok(false); } - // Read path: first read the new reldir keyspace. Early return if the relation exists. - // Otherwise, read the old reldir keyspace. - // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2. + let (v2_status, migrated_lsn) = self.get_rel_size_v2_status(); - if let RelSizeMigration::Migrated | RelSizeMigration::Migrating = - self.get_rel_size_v2_status() - { - // fetch directory listing (new) - let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum); - let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?) - .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; - let exists_v2 = buf == RelDirExists::Exists; - // Fast path: if the relation exists in the new format, return true. - // TODO: we should have a verification mode that checks both keyspaces - // to ensure the relation only exists in one of them. - if exists_v2 { - return Ok(true); + match v2_status { + RelSizeMigration::Legacy => { + let v1_exists = self + .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx) + .await?; + Ok(v1_exists) + } + RelSizeMigration::Migrating | RelSizeMigration::Migrated + if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) => + { + // For requests below the migrated LSN, we still use the v1 read path. + let v1_exists = self + .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx) + .await?; + Ok(v1_exists) + } + RelSizeMigration::Migrating => { + let v1_exists = self + .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx) + .await?; + let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await; + match v2_exists_res { + Ok(v2_exists) if v1_exists == v2_exists => {} + Ok(v2_exists) => { + tracing::warn!( + "inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}", + tag, + v1_exists, + v2_exists + ); + } + Err(e) => { + tracing::warn!("failed to get rel exists in v2: {e}"); + } + } + Ok(v1_exists) + } + RelSizeMigration::Migrated => { + let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?; + Ok(v2_exists) } } + } - // fetch directory listing (old) - - let key = rel_dir_to_key(tag.spcnode, tag.dbnode); - - if let Some((cached_key, dir)) = deserialized_reldir_v1 { - if cached_key == key { - return Ok(dir.rels.contains(&(tag.relnode, tag.forknum))); - } else if cfg!(test) || cfg!(feature = "testing") { - panic!("cached reldir key mismatch: {cached_key} != {key}"); - } else { - warn!("cached reldir key mismatch: {cached_key} != {key}"); - } - // Fallback to reading the directory from the datadir. - } + async fn list_rels_v1( + &self, + spcnode: Oid, + dbnode: Oid, + version: Version<'_>, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let key = rel_dir_to_key(spcnode, dbnode); let buf = version.get(self, key, ctx).await?; - let dir = RelDirectory::des(&buf)?; - let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum)); - Ok(exists_v1) + let rels_v1: HashSet = + HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag { + spcnode, + dbnode, + relnode: *relnode, + forknum: *forknum, + })); + Ok(rels_v1) + } + + async fn list_rels_v2( + &self, + spcnode: Oid, + dbnode: Oid, + version: Version<'_>, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let key_range = rel_tag_sparse_key_range(spcnode, dbnode); + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf.get_vectored_concurrent_io, + self.gate + .enter() + .map_err(|_| PageReconstructError::Cancelled)?, + ); + let results = self + .scan( + KeySpace::single(key_range), + version.get_lsn(), + ctx, + io_concurrency, + ) + .await?; + let mut rels = HashSet::new(); + for (key, val) in results { + let val = RelDirExists::decode(&val?).map_err(|_| { + PageReconstructError::Other(anyhow::anyhow!( + "invalid reldir key: decode failed, {}", + key + )) + })?; + if key.field6 != 1 { + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid reldir key: field6 != 1, {}", + key + ))); + } + if key.field2 != spcnode { + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid reldir key: field2 != spcnode, {}", + key + ))); + } + if key.field3 != dbnode { + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid reldir key: field3 != dbnode, {}", + key + ))); + } + let tag = RelTag { + spcnode, + dbnode, + relnode: key.field4, + forknum: key.field5, + }; + if val == RelDirExists::Removed { + debug_assert!(!rels.contains(&tag), "removed reltag in v2"); + continue; + } + let did_not_contain = rels.insert(tag); + debug_assert!(did_not_contain, "duplicate reltag in v2"); + } + Ok(rels) } /// Get a list of all existing relations in given tablespace and database. @@ -683,60 +835,45 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result, PageReconstructError> { - // fetch directory listing (old) - let key = rel_dir_to_key(spcnode, dbnode); - let buf = version.get(self, key, ctx).await?; + let (v2_status, migrated_lsn) = self.get_rel_size_v2_status(); - let dir = RelDirectory::des(&buf)?; - let rels_v1: HashSet = - HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag { - spcnode, - dbnode, - relnode: *relnode, - forknum: *forknum, - })); - - if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() { - return Ok(rels_v1); - } - - // scan directory listing (new), merge with the old results - let key_range = rel_tag_sparse_key_range(spcnode, dbnode); - let io_concurrency = IoConcurrency::spawn_from_conf( - self.conf.get_vectored_concurrent_io, - self.gate - .enter() - .map_err(|_| PageReconstructError::Cancelled)?, - ); - let results = self - .scan( - KeySpace::single(key_range), - version.get_lsn(), - ctx, - io_concurrency, - ) - .await?; - let mut rels = rels_v1; - for (key, val) in results { - let val = RelDirExists::decode(&val?) - .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?; - assert_eq!(key.field6, 1); - assert_eq!(key.field2, spcnode); - assert_eq!(key.field3, dbnode); - let tag = RelTag { - spcnode, - dbnode, - relnode: key.field4, - forknum: key.field5, - }; - if val == RelDirExists::Removed { - debug_assert!(!rels.contains(&tag), "removed reltag in v2"); - continue; + match v2_status { + RelSizeMigration::Legacy => { + let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?; + Ok(rels_v1) + } + RelSizeMigration::Migrating | RelSizeMigration::Migrated + if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) => + { + // For requests below the migrated LSN, we still use the v1 read path. + let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?; + Ok(rels_v1) + } + RelSizeMigration::Migrating => { + let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?; + let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await; + match rels_v2_res { + Ok(rels_v2) if rels_v1 == rels_v2 => {} + Ok(rels_v2) => { + tracing::warn!( + "inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}", + spcnode, + dbnode, + rels_v1.len(), + rels_v2.len() + ); + } + Err(e) => { + tracing::warn!("failed to list rels in v2: {e}"); + } + } + Ok(rels_v1) + } + RelSizeMigration::Migrated => { + let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?; + Ok(rels_v2) } - let did_not_contain = rels.insert(tag); - debug_assert!(did_not_contain, "duplicate reltag in v2"); } - Ok(rels) } /// Get the whole SLRU segment @@ -1258,10 +1395,10 @@ impl Timeline { let mut dbdir_cnt = 0; let mut rel_cnt = 0; - for (spcnode, dbnode) in dbdir.dbdirs.keys() { + for &(spcnode, dbnode) in dbdir.dbdirs.keys() { dbdir_cnt += 1; for rel in self - .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx) + .list_rels(spcnode, dbnode, Version::at(lsn), ctx) .await? { rel_cnt += 1; @@ -1566,6 +1703,9 @@ pub struct DatadirModification<'a> { /// An **approximation** of how many metadata bytes will be written to the EphemeralFile. pending_metadata_bytes: usize, + + /// Whether we are importing a pgdata directory. + is_importing_pgdata: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1578,6 +1718,14 @@ pub enum MetricsUpdate { Sub(u64), } +/// Controls the behavior of the reldir keyspace. +pub struct RelDirMode { + // Whether we can read the v2 keyspace or not. + current_status: RelSizeMigration, + // Whether we should initialize the v2 keyspace or not. + initialize: bool, +} + impl DatadirModification<'_> { // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we @@ -1933,30 +2081,49 @@ impl DatadirModification<'_> { } /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that - /// we enable it, we also need to persist it in `index_part.json`. - pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result { - let status = self.tline.get_rel_size_v2_status(); + /// we enable it, we also need to persist it in `index_part.json` (initialize is true). + /// + /// As this function is only used on the write path, we do not need to read the migrated_at + /// field. + pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result { + // TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature + + let (status, _) = self.tline.get_rel_size_v2_status(); let config = self.tline.get_rel_size_v2_enabled(); match (config, status) { (false, RelSizeMigration::Legacy) => { // tenant config didn't enable it and we didn't write any reldir_v2 key yet - Ok(false) + Ok(RelDirMode { + current_status: RelSizeMigration::Legacy, + initialize: false, + }) } - (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => { + (false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => { // index_part already persisted that the timeline has enabled rel_size_v2 - Ok(true) + Ok(RelDirMode { + current_status: status, + initialize: false, + }) } (true, RelSizeMigration::Legacy) => { // The first time we enable it, we need to persist it in `index_part.json` - self.tline - .update_rel_size_v2_status(RelSizeMigration::Migrating)?; - tracing::info!("enabled rel_size_v2"); - Ok(true) + // The caller should update the reldir status once the initialization is done. + // + // Only initialize the v2 keyspace on new relation creation. No initialization + // during `timeline_create` (TODO: fix this, we should allow, but currently it + // hits consistency issues). + Ok(RelDirMode { + current_status: RelSizeMigration::Legacy, + initialize: is_create && !self.is_importing_pgdata, + }) } - (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => { + (true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => { // index_part already persisted that the timeline has enabled rel_size_v2 // and we don't need to do anything - Ok(true) + Ok(RelDirMode { + current_status: status, + initialize: false, + }) } } } @@ -1969,8 +2136,8 @@ impl DatadirModification<'_> { img: Bytes, ctx: &RequestContext, ) -> Result<(), WalIngestError> { - let v2_enabled = self - .maybe_enable_rel_size_v2() + let v2_mode = self + .maybe_enable_rel_size_v2(false) .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?; // Add it to the directory (if it doesn't exist already) @@ -1986,17 +2153,19 @@ impl DatadirModification<'_> { self.put(DBDIR_KEY, Value::Image(buf.into())); } if r.is_none() { - // Create RelDirectory - // TODO: if we have fully migrated to v2, no need to create this directory + if v2_mode.current_status != RelSizeMigration::Legacy { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + } + + // Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory. + // Some code path relies on this directory to be present. We should remove it once we starts to set tenants to + // `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`). let buf = RelDirectory::ser(&RelDirectory { rels: HashSet::new(), })?; self.pending_directory_entries .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); - if v2_enabled { - self.pending_directory_entries - .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); - } self.put( rel_dir_to_key(spcnode, dbnode), Value::Image(Bytes::from(buf)), @@ -2103,6 +2272,109 @@ impl DatadirModification<'_> { Ok(()) } + async fn initialize_rel_size_v2_keyspace( + &mut self, + ctx: &RequestContext, + dbdir: &DbDirectory, + ) -> Result<(), WalIngestError> { + // Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort. + tracing::info!("initializing rel_size_v2 keyspace"); + let mut rel_cnt = 0; + // relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways + for &(spcnode, dbnode) in dbdir.dbdirs.keys() { + let rel_dir_key = rel_dir_to_key(spcnode, dbnode); + let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?; + for (relnode, forknum) in rel_dir.rels { + let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum); + self.put( + sparse_rel_dir_key, + Value::Image(RelDirExists::Exists.encode()), + ); + tracing::info!( + "migrated rel_size_v2: {}", + RelTag { + spcnode, + dbnode, + relnode, + forknum + } + ); + rel_cnt += 1; + } + } + tracing::info!( + "initialized rel_size_v2 keyspace at lsn {}: migrated {} relations", + self.lsn, + rel_cnt + ); + self.tline + .update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn)) + .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?; + Ok::<_, WalIngestError>(()) + } + + async fn put_rel_creation_v1( + &mut self, + rel: RelTag, + dbdir_exists: bool, + ctx: &RequestContext, + ) -> Result<(), WalIngestError> { + // Reldir v1 write path + let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); + let mut rel_dir = if !dbdir_exists { + // Create the RelDirectory + RelDirectory::default() + } else { + // reldir already exists, fetch it + RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? + }; + + // Add the new relation to the rel directory entry, and write it back + if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { + Err(WalIngestErrorKind::RelationAlreadyExists(rel))?; + } + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Set(0))) + } + self.pending_directory_entries + .push((DirectoryKind::Rel, MetricsUpdate::Add(1))); + self.put( + rel_dir_key, + Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), + ); + Ok(()) + } + + async fn put_rel_creation_v2( + &mut self, + rel: RelTag, + dbdir_exists: bool, + ctx: &RequestContext, + ) -> Result<(), WalIngestError> { + // Reldir v2 write path + let sparse_rel_dir_key = + rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum); + // check if the rel_dir_key exists in v2 + let val = self.sparse_get(sparse_rel_dir_key, ctx).await?; + let val = RelDirExists::decode_option(val) + .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?; + if val == RelDirExists::Exists { + Err(WalIngestErrorKind::RelationAlreadyExists(rel))?; + } + self.put( + sparse_rel_dir_key, + Value::Image(RelDirExists::Exists.encode()), + ); + if !dbdir_exists { + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); + } + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Add(1))); + Ok(()) + } + /// Create a relation fork. /// /// 'nblocks' is the initial size. @@ -2136,66 +2408,31 @@ impl DatadirModification<'_> { true }; - let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); - let mut rel_dir = if !dbdir_exists { - // Create the RelDirectory - RelDirectory::default() - } else { - // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? - }; - - let v2_enabled = self - .maybe_enable_rel_size_v2() + let mut v2_mode = self + .maybe_enable_rel_size_v2(true) .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?; - if v2_enabled { - if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) { - Err(WalIngestErrorKind::RelationAlreadyExists(rel))?; + if v2_mode.initialize { + if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await { + tracing::warn!("error initializing rel_size_v2 keyspace: {}", e); + // TODO: circuit breaker so that it won't retry forever + } else { + v2_mode.current_status = RelSizeMigration::Migrating; } - let sparse_rel_dir_key = - rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum); - // check if the rel_dir_key exists in v2 - let val = self.sparse_get(sparse_rel_dir_key, ctx).await?; - let val = RelDirExists::decode_option(val) - .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?; - if val == RelDirExists::Exists { - Err(WalIngestErrorKind::RelationAlreadyExists(rel))?; + } + + if v2_mode.current_status != RelSizeMigration::Migrated { + self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?; + } + + if v2_mode.current_status != RelSizeMigration::Legacy { + let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await; + if let Err(e) = write_v2_res { + if v2_mode.current_status == RelSizeMigration::Migrated { + return Err(e); + } + tracing::warn!("error writing rel_size_v2 keyspace: {}", e); } - self.put( - sparse_rel_dir_key, - Value::Image(RelDirExists::Exists.encode()), - ); - if !dbdir_exists { - self.pending_directory_entries - .push((DirectoryKind::Rel, MetricsUpdate::Set(0))); - self.pending_directory_entries - .push((DirectoryKind::RelV2, MetricsUpdate::Set(0))); - // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation. - // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there - // will be key not found errors if we don't create an empty one for rel_size_v2. - self.put( - rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)), - ); - } - self.pending_directory_entries - .push((DirectoryKind::RelV2, MetricsUpdate::Add(1))); - } else { - // Add the new relation to the rel directory entry, and write it back - if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { - Err(WalIngestErrorKind::RelationAlreadyExists(rel))?; - } - if !dbdir_exists { - self.pending_directory_entries - .push((DirectoryKind::Rel, MetricsUpdate::Set(0))) - } - self.pending_directory_entries - .push((DirectoryKind::Rel, MetricsUpdate::Add(1))); - self.put( - rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), - ); } // Put size @@ -2270,15 +2507,12 @@ impl DatadirModification<'_> { Ok(()) } - /// Drop some relations - pub(crate) async fn put_rel_drops( + async fn put_rel_drop_v1( &mut self, drop_relations: HashMap<(u32, u32), Vec>, ctx: &RequestContext, - ) -> Result<(), WalIngestError> { - let v2_enabled = self - .maybe_enable_rel_size_v2() - .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?; + ) -> Result, WalIngestError> { + let mut dropped_rels = BTreeSet::new(); for ((spc_node, db_node), rel_tags) in drop_relations { let dir_key = rel_dir_to_key(spc_node, db_node); let buf = self.get(dir_key, ctx).await?; @@ -2290,25 +2524,8 @@ impl DatadirModification<'_> { self.pending_directory_entries .push((DirectoryKind::Rel, MetricsUpdate::Sub(1))); dirty = true; + dropped_rels.insert(rel_tag); true - } else if v2_enabled { - // The rel is not found in the old reldir key, so we need to check the new sparse keyspace. - // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion - // logic). - let key = - rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum); - let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?) - .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?; - if val == RelDirExists::Exists { - self.pending_directory_entries - .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1))); - // put tombstone - self.put(key, Value::Image(RelDirExists::Removed.encode())); - // no need to set dirty to true - true - } else { - false - } } else { false }; @@ -2331,7 +2548,67 @@ impl DatadirModification<'_> { self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); } } + Ok(dropped_rels) + } + async fn put_rel_drop_v2( + &mut self, + drop_relations: HashMap<(u32, u32), Vec>, + ctx: &RequestContext, + ) -> Result, WalIngestError> { + let mut dropped_rels = BTreeSet::new(); + for ((spc_node, db_node), rel_tags) in drop_relations { + for rel_tag in rel_tags { + let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum); + let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?) + .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?; + if val == RelDirExists::Exists { + dropped_rels.insert(rel_tag); + self.pending_directory_entries + .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1))); + // put tombstone + self.put(key, Value::Image(RelDirExists::Removed.encode())); + } + } + } + Ok(dropped_rels) + } + + /// Drop some relations + pub(crate) async fn put_rel_drops( + &mut self, + drop_relations: HashMap<(u32, u32), Vec>, + ctx: &RequestContext, + ) -> Result<(), WalIngestError> { + let v2_mode = self + .maybe_enable_rel_size_v2(false) + .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?; + match v2_mode.current_status { + RelSizeMigration::Legacy => { + self.put_rel_drop_v1(drop_relations, ctx).await?; + } + RelSizeMigration::Migrating => { + let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?; + let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await; + match dropped_rels_v2_res { + Ok(dropped_rels_v2) => { + if dropped_rels_v1 != dropped_rels_v2 { + tracing::warn!( + "inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}", + dropped_rels_v1.len(), + dropped_rels_v2.len() + ); + } + } + Err(e) => { + tracing::warn!("error dropping rels: {}", e); + } + } + } + RelSizeMigration::Migrated => { + self.put_rel_drop_v2(drop_relations, ctx).await?; + } + } Ok(()) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4c8856c386..91b717a2e9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1205,6 +1205,7 @@ impl TenantShard { idempotency.clone(), index_part.gc_compaction.clone(), index_part.rel_size_migration.clone(), + index_part.rel_size_migrated_at, ctx, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); @@ -2584,6 +2585,7 @@ impl TenantShard { initdb_lsn, None, None, + None, ctx, ) .await @@ -2913,6 +2915,7 @@ impl TenantShard { initdb_lsn, None, None, + None, ctx, ) .await @@ -4342,6 +4345,7 @@ impl TenantShard { create_idempotency: CreateTimelineIdempotency, gc_compaction_state: Option, rel_size_v2_status: Option, + rel_size_migrated_at: Option, ctx: &RequestContext, ) -> anyhow::Result<(Arc, RequestContext)> { let state = match cause { @@ -4376,6 +4380,7 @@ impl TenantShard { create_idempotency, gc_compaction_state, rel_size_v2_status, + rel_size_migrated_at, self.cancel.child_token(), ); @@ -5085,6 +5090,7 @@ impl TenantShard { src_timeline.pg_version, ); + let (rel_size_v2_status, rel_size_migrated_at) = src_timeline.get_rel_size_v2_status(); let (uninitialized_timeline, _timeline_ctx) = self .prepare_new_timeline( dst_id, @@ -5092,7 +5098,8 @@ impl TenantShard { timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), - Some(src_timeline.get_rel_size_v2_status()), + Some(rel_size_v2_status), + rel_size_migrated_at, ctx, ) .await?; @@ -5379,6 +5386,7 @@ impl TenantShard { pgdata_lsn, None, None, + None, ctx, ) .await?; @@ -5462,14 +5470,17 @@ impl TenantShard { start_lsn: Lsn, ancestor: Option>, rel_size_v2_status: Option, + rel_size_migrated_at: Option, ctx: &RequestContext, ) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); - resources - .remote_client - .init_upload_queue_for_empty_remote(new_metadata, rel_size_v2_status.clone())?; + resources.remote_client.init_upload_queue_for_empty_remote( + new_metadata, + rel_size_v2_status.clone(), + rel_size_migrated_at, + )?; let (timeline_struct, timeline_ctx) = self .create_timeline_struct( @@ -5482,6 +5493,7 @@ impl TenantShard { create_guard.idempotency.clone(), None, rel_size_v2_status, + rel_size_migrated_at, ctx, ) .context("Failed to create timeline data structure")?; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index fd65000379..6b650beb3f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -443,7 +443,8 @@ impl RemoteTimelineClient { pub fn init_upload_queue_for_empty_remote( &self, local_metadata: &TimelineMetadata, - rel_size_v2_status: Option, + rel_size_v2_migration: Option, + rel_size_migrated_at: Option, ) -> anyhow::Result<()> { // Set the maximum number of inprogress tasks to the remote storage concurrency. There's // certainly no point in starting more upload tasks than this. @@ -455,7 +456,8 @@ impl RemoteTimelineClient { let mut upload_queue = self.upload_queue.lock().unwrap(); let initialized_queue = upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?; - initialized_queue.dirty.rel_size_migration = rel_size_v2_status; + initialized_queue.dirty.rel_size_migration = rel_size_v2_migration; + initialized_queue.dirty.rel_size_migrated_at = rel_size_migrated_at; self.update_remote_physical_size_gauge(None); info!("initialized upload queue as empty"); Ok(()) @@ -994,10 +996,12 @@ impl RemoteTimelineClient { pub(crate) fn schedule_index_upload_for_rel_size_v2_status_update( self: &Arc, rel_size_v2_status: RelSizeMigration, + rel_size_migrated_at: Option, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; upload_queue.dirty.rel_size_migration = Some(rel_size_v2_status); + upload_queue.dirty.rel_size_migrated_at = rel_size_migrated_at; // TODO: allow this operation to bypass the validation check because we might upload the index part // with no layers but the flag updated. For now, we just modify the index part in memory and the next // upload will include the flag. diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 6060c42cbb..9531e7f650 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -114,6 +114,11 @@ pub struct IndexPart { /// The timestamp when the timeline was marked invisible in synthetic size calculations. #[serde(skip_serializing_if = "Option::is_none", default)] pub(crate) marked_invisible_at: Option, + + /// The LSN at which we started the rel size migration. Accesses below this LSN should be + /// processed with the v1 read path. Usually this LSN should be set together with `rel_size_migration`. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub(crate) rel_size_migrated_at: Option, } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] @@ -142,10 +147,12 @@ impl IndexPart { /// - 12: +l2_lsn /// - 13: +gc_compaction /// - 14: +marked_invisible_at - const LATEST_VERSION: usize = 14; + /// - 15: +rel_size_migrated_at + const LATEST_VERSION: usize = 15; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]; + pub const KNOWN_VERSIONS: &'static [usize] = + &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -165,6 +172,7 @@ impl IndexPart { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, } } @@ -475,6 +483,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -524,6 +533,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -574,6 +584,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -627,6 +638,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -675,6 +687,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -726,6 +739,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -782,6 +796,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -843,6 +858,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -905,6 +921,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -972,6 +989,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -1052,6 +1070,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -1133,6 +1152,7 @@ mod tests { l2_lsn: None, gc_compaction: None, marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -1220,6 +1240,7 @@ mod tests { last_completed_lsn: "0/16960E8".parse::().unwrap(), }), marked_invisible_at: None, + rel_size_migrated_at: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -1308,6 +1329,97 @@ mod tests { last_completed_lsn: "0/16960E8".parse::().unwrap(), }), marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), + rel_size_migrated_at: None, + }; + + let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v15_rel_size_migrated_at_is_parsed() { + let example = r#"{ + "version": 15, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata": { + "disk_consistent_lsn": "0/16960E8", + "prev_record_lsn": "0/1696070", + "ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e", + "ancestor_lsn": "0/0", + "latest_gc_cutoff_lsn": "0/1696070", + "initdb_lsn": "0/1696070", + "pg_version": 14 + }, + "gc_blocking": { + "started_at": "2024-07-19T09:00:00.123", + "reasons": ["DetachAncestor"] + }, + "import_pgdata": { + "V1": { + "Done": { + "idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5", + "started_at": "2024-11-13T09:23:42.123", + "finished_at": "2024-11-13T09:42:23.123" + } + } + }, + "rel_size_migration": "legacy", + "l2_lsn": "0/16960E8", + "gc_compaction": { + "last_completed_lsn": "0/16960E8" + }, + "marked_invisible_at": "2023-07-31T09:00:00.123", + "rel_size_migrated_at": "0/16960E8" + }"#; + + let expected = IndexPart { + version: 15, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata { + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::new( + Lsn::from_str("0/16960E8").unwrap(), + Some(Lsn::from_str("0/1696070").unwrap()), + Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()), + Lsn::INVALID, + Lsn::from_str("0/1696070").unwrap(), + Lsn::from_str("0/1696070").unwrap(), + PgMajorVersion::PG14, + ).with_recalculated_checksum().unwrap(), + deleted_at: None, + lineage: Default::default(), + gc_blocking: Some(GcBlocking { + started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"), + reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]), + }), + last_aux_file_policy: Default::default(), + archived_at: None, + import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{ + started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"), + finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"), + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()), + }))), + rel_size_migration: Some(RelSizeMigration::Legacy), + l2_lsn: Some("0/16960E8".parse::().unwrap()), + gc_compaction: Some(GcCompactionState { + last_completed_lsn: "0/16960E8".parse::().unwrap(), + }), + marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), + rel_size_migrated_at: Some("0/16960E8".parse::().unwrap()), }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3ef07aa414..483e9d9a2a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -441,7 +441,7 @@ pub struct Timeline { /// heatmap on demand. heatmap_layers_downloader: Mutex>, - pub(crate) rel_size_v2_status: ArcSwapOption, + pub(crate) rel_size_v2_status: ArcSwap<(Option, Option)>, wait_lsn_log_slow: tokio::sync::Semaphore, @@ -2894,12 +2894,9 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled) } - pub(crate) fn get_rel_size_v2_status(&self) -> RelSizeMigration { - self.rel_size_v2_status - .load() - .as_ref() - .map(|s| s.as_ref().clone()) - .unwrap_or(RelSizeMigration::Legacy) + pub(crate) fn get_rel_size_v2_status(&self) -> (RelSizeMigration, Option) { + let (status, migrated_at) = self.rel_size_v2_status.load().as_ref().clone(); + (status.unwrap_or(RelSizeMigration::Legacy), migrated_at) } fn get_compaction_upper_limit(&self) -> usize { @@ -3174,6 +3171,7 @@ impl Timeline { create_idempotency: crate::tenant::CreateTimelineIdempotency, gc_compaction_state: Option, rel_size_v2_status: Option, + rel_size_migrated_at: Option, cancel: CancellationToken, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -3338,7 +3336,10 @@ impl Timeline { heatmap_layers_downloader: Mutex::new(None), - rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status), + rel_size_v2_status: ArcSwap::from_pointee(( + rel_size_v2_status, + rel_size_migrated_at, + )), wait_lsn_log_slow: tokio::sync::Semaphore::new(1), @@ -3426,11 +3427,17 @@ impl Timeline { pub(crate) fn update_rel_size_v2_status( &self, rel_size_v2_status: RelSizeMigration, + rel_size_migrated_at: Option, ) -> anyhow::Result<()> { - self.rel_size_v2_status - .store(Some(Arc::new(rel_size_v2_status.clone()))); + self.rel_size_v2_status.store(Arc::new(( + Some(rel_size_v2_status.clone()), + rel_size_migrated_at, + ))); self.remote_client - .schedule_index_upload_for_rel_size_v2_status_update(rel_size_v2_status) + .schedule_index_upload_for_rel_size_v2_status_update( + rel_size_v2_status, + rel_size_migrated_at, + ) } pub(crate) fn get_gc_compaction_state(&self) -> Option { diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index f7dc44be90..2f6eccdbf9 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -332,6 +332,7 @@ impl DeleteTimelineFlow { crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here None, // doesn't matter what we put here None, // doesn't matter what we put here + None, // doesn't matter what we put here ctx, ) .context("create_timeline_struct")?; diff --git a/test_runner/performance/test_perf_many_relations.py b/test_runner/performance/test_perf_many_relations.py index 81dae53759..da76c3ec86 100644 --- a/test_runner/performance/test_perf_many_relations.py +++ b/test_runner/performance/test_perf_many_relations.py @@ -5,7 +5,7 @@ import pytest from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.compare_fixtures import RemoteCompare from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.utils import shared_buffers_for_max_cu @@ -69,13 +69,20 @@ def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int): ) -def test_perf_simple_many_relations_reldir_v2( - neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker +@pytest.mark.parametrize( + "reldir,num_relations", + [("v1", 10000), ("v1v2", 10000), ("v2", 10000), ("v2", 100000)], + ids=["v1-small", "v1v2-small", "v2-small", "v2-large"], +) +def test_perf_simple_many_relations_reldir( + neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, reldir: str, num_relations: int ): """ Test creating many relations in a single database. """ - env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": "true"}) + env = neon_env_builder.init_start( + initial_tenant_conf={"rel_size_v2_enabled": "true" if reldir != "v1" else "false"} + ) ep = env.endpoints.create_start( "main", config_lines=[ @@ -85,14 +92,38 @@ def test_perf_simple_many_relations_reldir_v2( ], ) - assert ( - env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ - "rel_size_migration" - ] - != "legacy" - ) + ep.safe_psql("CREATE TABLE IF NOT EXISTS initial_table (v1 int)") + wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline) - n = 100000 + if reldir == "v1": + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migration" + ] + == "legacy" + ) + elif reldir == "v1v2": + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migration" + ] + == "migrating" + ) + elif reldir == "v2": + # only read/write to the v2 keyspace + env.pageserver.http_client().timeline_patch_index_part( + env.initial_tenant, env.initial_timeline, {"rel_size_migration": "migrated"} + ) + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migration" + ] + == "migrated" + ) + else: + raise AssertionError(f"Invalid reldir config: {reldir}") + + n = num_relations step = 5000 # Create many relations log.info(f"Creating {n} relations...") diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 734887c5b3..635a040800 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -538,6 +538,7 @@ def test_historic_storage_formats( neon_env_builder.enable_pageserver_remote_storage(s3_storage()) neon_env_builder.pg_version = dataset.pg_version env = neon_env_builder.init_configs() + env.start() assert isinstance(env.pageserver_remote_storage, S3Storage) @@ -576,6 +577,17 @@ def test_historic_storage_formats( # All our artifacts should contain at least one timeline assert len(timelines) > 0 + if dataset.name == "2025-04-08-tenant-manifest-v1": + # This dataset was created at a time where we decided to migrate to v2 reldir by simply disabling writes to v1 + # and starting writing to v2. This was too risky and we have reworked the migration plan. Therefore, we should + # opt in full relv2 mode for this dataset. + for timeline in timelines: + env.pageserver.http_client().timeline_patch_index_part( + dataset.tenant_id, + timeline["timeline_id"], + {"force_index_update": True, "rel_size_migration": "migrated"}, + ) + # Import tenant does not create the timeline on safekeepers, # because it is a debug handler and the timeline may have already been # created on some set of safekeepers. diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index dd9c5437ad..e7c7abf397 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -457,21 +457,6 @@ def test_tx_abort_with_many_relations( ], ) - if reldir_type == "v1": - assert ( - env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ - "rel_size_migration" - ] - == "legacy" - ) - else: - assert ( - env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ - "rel_size_migration" - ] - != "legacy" - ) - # How many relations: this number is tuned to be long enough to take tens of seconds # if the rollback code path is buggy, tripping the test's timeout. n = 5000 @@ -556,3 +541,19 @@ def test_tx_abort_with_many_relations( except: exec.shutdown(wait=False, cancel_futures=True) raise + + # Do the check after everything is done, because the reldirv2 transition won't happen until create table. + if reldir_type == "v1": + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migration" + ] + == "legacy" + ) + else: + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migration" + ] + != "legacy" + ) diff --git a/test_runner/regress/test_relations.py b/test_runner/regress/test_relations.py index b2ddcb1c2e..6263ced9df 100644 --- a/test_runner/regress/test_relations.py +++ b/test_runner/regress/test_relations.py @@ -7,6 +7,8 @@ if TYPE_CHECKING: NeonEnvBuilder, ) +from fixtures.neon_fixtures import wait_for_last_flush_lsn + def test_pageserver_reldir_v2( neon_env_builder: NeonEnvBuilder, @@ -65,6 +67,8 @@ def test_pageserver_reldir_v2( endpoint.safe_psql("CREATE TABLE foo4 (id INTEGER PRIMARY KEY, val text)") # Delete a relation in v1 endpoint.safe_psql("DROP TABLE foo1") + # wait pageserver to apply the LSN + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) # Check if both relations are still accessible endpoint.safe_psql("SELECT * FROM foo2") @@ -76,12 +80,16 @@ def test_pageserver_reldir_v2( # This will acquire a basebackup, which lists all relations. endpoint.start() - # Check if both relations are still accessible + # Check if both relations are still accessible after restart endpoint.safe_psql("DROP TABLE IF EXISTS foo1") endpoint.safe_psql("SELECT * FROM foo2") endpoint.safe_psql("SELECT * FROM foo3") endpoint.safe_psql("SELECT * FROM foo4") endpoint.safe_psql("DROP TABLE foo3") + # wait pageserver to apply the LSN + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) + + # Restart the endpoint again endpoint.stop() endpoint.start() @@ -99,6 +107,9 @@ def test_pageserver_reldir_v2( }, ) + endpoint.stop() + endpoint.start() + # Check if the relation is still accessible endpoint.safe_psql("SELECT * FROM foo2") endpoint.safe_psql("SELECT * FROM foo4") @@ -111,3 +122,10 @@ def test_pageserver_reldir_v2( ] == "migrating" ) + + assert ( + env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[ + "rel_size_migrated_at" + ] + is not None + )