|
|
|
|
@@ -16,10 +16,10 @@ use anyhow::Context;
|
|
|
|
|
use bytes::{Buf, Bytes, BytesMut};
|
|
|
|
|
use enum_map::Enum;
|
|
|
|
|
use pageserver_api::key::{
|
|
|
|
|
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
|
|
|
|
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
|
|
|
|
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
|
|
|
|
|
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
|
|
|
|
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key,
|
|
|
|
|
REL_DIR_MIGRATION_KEY, RelDirExists, TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key,
|
|
|
|
|
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
|
|
|
|
|
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
|
|
|
|
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
|
|
|
|
};
|
|
|
|
|
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
|
|
|
|
@@ -691,7 +691,10 @@ impl Timeline {
|
|
|
|
|
return Ok(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let v2_status = dbdir.get_persistent_rel_size_v2_status();
|
|
|
|
|
let migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
|
|
|
|
|
let migration_history = RelDirMigrationHistory::from_bytes(migration_history)
|
|
|
|
|
.context("failed to deserialize rel dir migration history")?;
|
|
|
|
|
let v2_status = migration_history.status;
|
|
|
|
|
|
|
|
|
|
match v2_status {
|
|
|
|
|
RelSizeMigration::Legacy => {
|
|
|
|
|
@@ -834,8 +837,10 @@ impl Timeline {
|
|
|
|
|
version: Version<'_>,
|
|
|
|
|
ctx: &RequestContext,
|
|
|
|
|
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
|
|
|
|
let dbdir = DbDirectory::des(&version.get(self, DBDIR_KEY, ctx).await?)?;
|
|
|
|
|
let v2_status = dbdir.get_persistent_rel_size_v2_status();
|
|
|
|
|
let reldir_migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
|
|
|
|
|
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
|
|
|
|
.context("failed to deserialize rel dir migration history")?;
|
|
|
|
|
let v2_status = reldir_migration_history.status;
|
|
|
|
|
|
|
|
|
|
match v2_status {
|
|
|
|
|
RelSizeMigration::Legacy => {
|
|
|
|
|
@@ -1802,7 +1807,6 @@ impl DatadirModification<'_> {
|
|
|
|
|
pub fn init_empty(&mut self) -> anyhow::Result<()> {
|
|
|
|
|
let buf = DbDirectory::ser(&DbDirectory {
|
|
|
|
|
dbdirs: HashMap::new(),
|
|
|
|
|
rel_dir_migration_status: None,
|
|
|
|
|
})?;
|
|
|
|
|
self.pending_directory_entries
|
|
|
|
|
.push((DirectoryKind::Db, MetricsUpdate::Set(0)));
|
|
|
|
|
@@ -2086,12 +2090,12 @@ impl DatadirModification<'_> {
|
|
|
|
|
/// field.
|
|
|
|
|
pub(crate) fn maybe_enable_rel_size_v2(
|
|
|
|
|
&mut self,
|
|
|
|
|
dbdir: &DbDirectory,
|
|
|
|
|
migration_history: &RelDirMigrationHistory,
|
|
|
|
|
is_create: bool,
|
|
|
|
|
) -> anyhow::Result<RelDirMode> {
|
|
|
|
|
// TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
|
|
|
|
|
let expected_status = self.tline.get_rel_size_v2_expected_state();
|
|
|
|
|
let persistent_status = dbdir.get_persistent_rel_size_v2_status();
|
|
|
|
|
let persistent_status = migration_history.status.clone();
|
|
|
|
|
|
|
|
|
|
// Only initialize the v2 keyspace on new relation creation. No initialization
|
|
|
|
|
// during `timeline_create` (TODO: fix this, we should allow, but currently it
|
|
|
|
|
@@ -2166,9 +2170,14 @@ impl DatadirModification<'_> {
|
|
|
|
|
let buf = self.get(DBDIR_KEY, ctx).await?;
|
|
|
|
|
let mut dbdir = DbDirectory::des(&buf)?;
|
|
|
|
|
|
|
|
|
|
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
|
|
|
|
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
|
|
|
|
.context("failed to deserialize rel dir migration history")
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
|
|
|
|
|
let v2_mode = self
|
|
|
|
|
.maybe_enable_rel_size_v2(&dbdir, false)
|
|
|
|
|
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
|
|
|
|
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
|
|
|
|
|
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
|
|
|
|
if r.is_none() || r == Some(false) {
|
|
|
|
|
@@ -2406,9 +2415,14 @@ impl DatadirModification<'_> {
|
|
|
|
|
// It's possible that this is the first rel for this db in this
|
|
|
|
|
// tablespace. Create the reldir entry for it if so.
|
|
|
|
|
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
|
|
|
|
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
|
|
|
|
let mut reldir_migration_history =
|
|
|
|
|
RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
|
|
|
|
.context("failed to deserialize rel dir migration history")
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
|
|
|
|
|
let mut is_dbdir_dirty = false;
|
|
|
|
|
let mut is_reldirv2_index_part_dirty = false;
|
|
|
|
|
let mut is_reldirv2_status_dirty = false;
|
|
|
|
|
|
|
|
|
|
let dbdir_exists =
|
|
|
|
|
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
|
|
|
|
@@ -2425,8 +2439,8 @@ impl DatadirModification<'_> {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut v2_mode = self
|
|
|
|
|
.maybe_enable_rel_size_v2(&dbdir, true)
|
|
|
|
|
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
|
|
|
|
.maybe_enable_rel_size_v2(&reldir_migration_history, true)
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
|
|
|
|
|
if v2_mode.initialize {
|
|
|
|
|
if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
|
|
|
|
|
@@ -2434,20 +2448,16 @@ impl DatadirModification<'_> {
|
|
|
|
|
// TODO: circuit breaker so that it won't retry forever
|
|
|
|
|
} else {
|
|
|
|
|
v2_mode.current_status = RelSizeMigration::Migrating;
|
|
|
|
|
let migration_history = dbdir.rel_dir_migration_status.get_or_insert_default();
|
|
|
|
|
migration_history.status = Some(RelSizeMigration::Migrating);
|
|
|
|
|
migration_history.v2_enabled_at = Some(self.lsn);
|
|
|
|
|
is_dbdir_dirty = true;
|
|
|
|
|
is_reldirv2_index_part_dirty = true;
|
|
|
|
|
reldir_migration_history.status = RelSizeMigration::Migrating;
|
|
|
|
|
reldir_migration_history.v2_enabled_at = Some(self.lsn);
|
|
|
|
|
is_reldirv2_status_dirty = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if v2_mode.disable_v1 {
|
|
|
|
|
v2_mode.current_status = RelSizeMigration::Migrated;
|
|
|
|
|
let migration_history = dbdir.rel_dir_migration_status.get_or_insert_default();
|
|
|
|
|
migration_history.status = Some(RelSizeMigration::Migrated);
|
|
|
|
|
migration_history.v1_disabled_at = Some(self.lsn);
|
|
|
|
|
is_dbdir_dirty = true;
|
|
|
|
|
is_reldirv2_index_part_dirty = true;
|
|
|
|
|
reldir_migration_history.status = RelSizeMigration::Migrated;
|
|
|
|
|
reldir_migration_history.v1_disabled_at = Some(self.lsn);
|
|
|
|
|
is_reldirv2_status_dirty = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if is_dbdir_dirty {
|
|
|
|
|
@@ -2455,13 +2465,22 @@ impl DatadirModification<'_> {
|
|
|
|
|
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if is_reldirv2_index_part_dirty {
|
|
|
|
|
if is_reldirv2_status_dirty {
|
|
|
|
|
self.tline
|
|
|
|
|
.update_rel_size_v2_status(
|
|
|
|
|
dbdir.get_persistent_rel_size_v2_status(),
|
|
|
|
|
dbdir.get_persistent_rel_size_v2_migrated_at(),
|
|
|
|
|
reldir_migration_history.status.clone(),
|
|
|
|
|
reldir_migration_history.v1_disabled_at,
|
|
|
|
|
)
|
|
|
|
|
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
self.put(
|
|
|
|
|
REL_DIR_MIGRATION_KEY,
|
|
|
|
|
Value::Image(
|
|
|
|
|
reldir_migration_history
|
|
|
|
|
.encode()
|
|
|
|
|
.context("failed to serialize rel dir migration history")
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?,
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if v2_mode.current_status != RelSizeMigration::Migrated {
|
|
|
|
|
@@ -2623,10 +2642,13 @@ impl DatadirModification<'_> {
|
|
|
|
|
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
|
|
|
|
ctx: &RequestContext,
|
|
|
|
|
) -> Result<(), WalIngestError> {
|
|
|
|
|
let dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
|
|
|
|
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
|
|
|
|
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
|
|
|
|
.context("failed to deserialize rel dir migration history")
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
let v2_mode = self
|
|
|
|
|
.maybe_enable_rel_size_v2(&dbdir, false)
|
|
|
|
|
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
|
|
|
|
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
|
|
|
|
|
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
|
|
|
|
match v2_mode.current_status {
|
|
|
|
|
RelSizeMigration::Legacy => {
|
|
|
|
|
self.put_rel_drop_v1(drop_relations, ctx).await?;
|
|
|
|
|
@@ -3200,32 +3222,35 @@ impl Version<'_> {
|
|
|
|
|
//--- Metadata structs stored in key-value pairs in the repository.
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
|
|
|
pub(crate) struct RelSizeMigrationHistory {
|
|
|
|
|
pub(crate) status: Option<RelSizeMigration>,
|
|
|
|
|
pub(crate) struct RelDirMigrationHistory {
|
|
|
|
|
pub(crate) status: RelSizeMigration,
|
|
|
|
|
pub(crate) v2_enabled_at: Option<Lsn>,
|
|
|
|
|
pub(crate) v1_disabled_at: Option<Lsn>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl RelDirMigrationHistory {
|
|
|
|
|
pub(crate) fn from_bytes(bytes: Option<Bytes>) -> Result<Self, serde_json::Error> {
|
|
|
|
|
match bytes {
|
|
|
|
|
Some(bytes) => {
|
|
|
|
|
if bytes.is_empty() {
|
|
|
|
|
return Ok(Self::default());
|
|
|
|
|
}
|
|
|
|
|
let history = serde_json::from_slice(&bytes)?;
|
|
|
|
|
Ok(history)
|
|
|
|
|
}
|
|
|
|
|
None => Ok(Self::default()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) fn encode(&self) -> Result<Bytes, serde_json::Error> {
|
|
|
|
|
serde_json::to_vec(self).map(Bytes::from)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
|
|
|
pub(crate) struct DbDirectory {
|
|
|
|
|
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
|
|
|
|
|
pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
|
|
|
|
|
pub(crate) rel_dir_migration_status: Option<RelSizeMigrationHistory>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DbDirectory {
|
|
|
|
|
pub(crate) fn get_persistent_rel_size_v2_status(&self) -> RelSizeMigration {
|
|
|
|
|
self.rel_dir_migration_status
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|x| x.status.clone())
|
|
|
|
|
.unwrap_or(RelSizeMigration::Legacy)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) fn get_persistent_rel_size_v2_migrated_at(&self) -> Option<Lsn> {
|
|
|
|
|
self.rel_dir_migration_status
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|x| x.v1_disabled_at)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
|
|
|
|
|
|