Compare commits

..

1 Commits

Author SHA1 Message Date
BodoBolero
0ef628fd9e test a variant with fixed compute size 2025-07-28 17:01:14 +02:00
24 changed files with 126 additions and 299 deletions

View File

@@ -31,15 +31,15 @@ jobs:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
min_cu: 2 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 2
min_cu: 8
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 2
min_cu: 16
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:

View File

@@ -571,11 +571,6 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'basebackup_cache_enabled' as bool")?,
rel_size_v1_access_disabled: settings
.remove("rel_size_v1_access_disabled")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'rel_size_v1_access_disabled' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -651,9 +651,6 @@ pub struct TenantConfigToml {
// FIXME: Remove skip_serializing_if when the feature is stable.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub basebackup_cache_enabled: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub rel_size_v1_access_disabled: bool,
}
pub mod defaults {
@@ -962,7 +959,6 @@ impl Default for TenantConfigToml {
sampling_ratio: None,
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
basebackup_cache_enabled: false,
rel_size_v1_access_disabled: false,
}
}
}

View File

@@ -519,15 +519,6 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
pub const REL_DIR_MIGRATION_KEY: Key = Key {
field1: REL_DIR_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
#[inline(always)]
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
Key {

View File

@@ -646,8 +646,6 @@ pub struct TenantConfigPatch {
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub basebackup_cache_enabled: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub rel_size_v1_access_disabled: FieldPatch<bool>,
}
/// Like [`crate::config::TenantConfigToml`], but preserves the information
@@ -785,9 +783,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_enabled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rel_size_v1_access_disabled: Option<bool>,
}
impl TenantConfig {
@@ -835,7 +830,6 @@ impl TenantConfig {
mut sampling_ratio,
mut relsize_snapshot_cache_capacity,
mut basebackup_cache_enabled,
mut rel_size_v1_access_disabled,
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
@@ -945,9 +939,6 @@ impl TenantConfig {
patch
.basebackup_cache_enabled
.apply(&mut basebackup_cache_enabled);
patch
.rel_size_v1_access_disabled
.apply(&mut rel_size_v1_access_disabled);
Ok(Self {
checkpoint_distance,
@@ -989,7 +980,6 @@ impl TenantConfig {
sampling_ratio,
relsize_snapshot_cache_capacity,
basebackup_cache_enabled,
rel_size_v1_access_disabled,
})
}
@@ -1104,9 +1094,6 @@ impl TenantConfig {
basebackup_cache_enabled: self
.basebackup_cache_enabled
.unwrap_or(global_conf.basebackup_cache_enabled),
rel_size_v1_access_disabled: self
.rel_size_v1_access_disabled
.unwrap_or(global_conf.rel_size_v1_access_disabled),
}
}
}
@@ -1539,15 +1526,12 @@ pub struct OffloadedTimelineInfo {
pub archived_at: chrono::DateTime<chrono::Utc>,
}
/// The state of the rel size migration. This is persisted in the DbDir key and index part. Do not change without considering
/// compatibility.
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RelSizeMigration {
/// The tenant is using the old rel_size format.
/// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
/// `None` is the same as `Some(RelSizeMigration::Legacy)`.
#[default]
Legacy,
/// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
/// persisted in the storage. The read path will read both formats and validate them.

View File

@@ -484,7 +484,7 @@ async fn build_timeline_info_common(
*timeline.get_applied_gc_cutoff_lsn(),
);
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_cached_status();
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_status();
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,

View File

@@ -16,10 +16,10 @@ use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key,
REL_DIR_MIGRATION_KEY, RelDirExists, TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key,
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
};
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
@@ -685,16 +685,12 @@ impl Timeline {
// then check if the database was already initialized.
// get_rel_exists can be called before dbdir is created.
let buf = version.get(self, DBDIR_KEY, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
let dbdirs = &dbdir.dbdirs;
let dbdirs = DbDirectory::des(&buf)?.dbdirs;
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
let migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
let migration_history = RelDirMigrationHistory::from_bytes(migration_history)
.context("failed to deserialize rel dir migration history")?;
let v2_status = migration_history.status;
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
match v2_status {
RelSizeMigration::Legacy => {
@@ -703,6 +699,15 @@ impl Timeline {
.await?;
Ok(v1_exists)
}
RelSizeMigration::Migrating | RelSizeMigration::Migrated
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
{
// For requests below the migrated LSN, we still use the v1 read path.
let v1_exists = self
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
.await?;
Ok(v1_exists)
}
RelSizeMigration::Migrating => {
let v1_exists = self
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
@@ -715,12 +720,9 @@ impl Timeline {
"inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
tag,
v1_exists,
v2_exists,
v2_exists
);
}
Err(e) if e.is_cancel() => {
// Cancellation errors are fine to ignore, do not log.
}
Err(e) => {
tracing::warn!("failed to get rel exists in v2: {e}");
}
@@ -778,10 +780,6 @@ impl Timeline {
.await?;
let mut rels = HashSet::new();
for (key, val) in results {
if key == REL_DIR_MIGRATION_KEY {
// The key that determines the current migration status, skip it.
continue;
}
let val = RelDirExists::decode(&val?).map_err(|_| {
PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: decode failed, {}",
@@ -813,11 +811,11 @@ impl Timeline {
forknum: key.field5,
};
if val == RelDirExists::Removed {
debug_assert!(!rels.contains(&tag), "removed reltag in v2: {tag}");
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
continue;
}
let did_not_contain = rels.insert(tag);
debug_assert!(did_not_contain, "duplicate reltag in v2: {tag}");
debug_assert!(did_not_contain, "duplicate reltag in v2");
}
Ok(rels)
}
@@ -837,16 +835,20 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
let reldir_migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
.context("failed to deserialize rel dir migration history")?;
let v2_status = reldir_migration_history.status;
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
match v2_status {
RelSizeMigration::Legacy => {
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
Ok(rels_v1)
}
RelSizeMigration::Migrating | RelSizeMigration::Migrated
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
{
// For requests below the migrated LSN, we still use the v1 read path.
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
Ok(rels_v1)
}
RelSizeMigration::Migrating => {
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
@@ -861,9 +863,6 @@ impl Timeline {
rels_v2.len()
);
}
Err(e) if e.is_cancel() => {
// Cancellation errors are fine to ignore, do not log.
}
Err(e) => {
tracing::warn!("failed to list rels in v2: {e}");
}
@@ -1725,8 +1724,6 @@ pub struct RelDirMode {
current_status: RelSizeMigration,
// Whether we should initialize the v2 keyspace or not.
initialize: bool,
// Whether we should disable v1 access starting this LSNor not
disable_v1: bool,
}
impl DatadirModification<'_> {
@@ -2088,71 +2085,44 @@ impl DatadirModification<'_> {
///
/// As this function is only used on the write path, we do not need to read the migrated_at
/// field.
pub(crate) fn maybe_enable_rel_size_v2(
&mut self,
migration_history: &RelDirMigrationHistory,
is_create: bool,
) -> anyhow::Result<RelDirMode> {
pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result<RelDirMode> {
// TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
let expected_status = self.tline.get_rel_size_v2_expected_state();
let persistent_status = migration_history.status.clone();
// Only initialize the v2 keyspace on new relation creation. No initialization
// during `timeline_create` (TODO: fix this, we should allow, but currently it
// hits consistency issues).
let can_update = is_create && !self.is_importing_pgdata;
match (expected_status, persistent_status) {
(RelSizeMigration::Legacy, RelSizeMigration::Legacy) => {
let (status, _) = self.tline.get_rel_size_v2_status();
let config = self.tline.get_rel_size_v2_enabled();
match (config, status) {
(false, RelSizeMigration::Legacy) => {
// tenant config didn't enable it and we didn't write any reldir_v2 key yet
Ok(RelDirMode {
current_status: RelSizeMigration::Legacy,
initialize: false,
disable_v1: false,
})
}
(
RelSizeMigration::Legacy,
current_status @ RelSizeMigration::Migrating
| current_status @ RelSizeMigration::Migrated,
) => {
// already persisted that the timeline has enabled rel_size_v2, cannot rollback
(false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
// index_part already persisted that the timeline has enabled rel_size_v2
Ok(RelDirMode {
current_status,
current_status: status,
initialize: false,
disable_v1: false,
})
}
(
expected_status @ RelSizeMigration::Migrating
| expected_status @ RelSizeMigration::Migrated,
RelSizeMigration::Legacy,
) => {
(true, RelSizeMigration::Legacy) => {
// The first time we enable it, we need to persist it in `index_part.json`
// The caller should update the reldir status once the initialization is done.
//
// Only initialize the v2 keyspace on new relation creation. No initialization
// during `timeline_create` (TODO: fix this, we should allow, but currently it
// hits consistency issues).
Ok(RelDirMode {
current_status: RelSizeMigration::Legacy,
initialize: can_update,
disable_v1: can_update && expected_status == RelSizeMigration::Migrated,
initialize: is_create && !self.is_importing_pgdata,
})
}
(RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrating)
| (RelSizeMigration::Migrated, current_status @ RelSizeMigration::Migrated)
| (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrated) => {
// Keep the current state
(true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
// index_part already persisted that the timeline has enabled rel_size_v2
// and we don't need to do anything
Ok(RelDirMode {
current_status,
current_status: status,
initialize: false,
disable_v1: false,
})
}
(RelSizeMigration::Migrated, RelSizeMigration::Migrating) => {
// Switch to v2-only mode
Ok(RelDirMode {
current_status: RelSizeMigration::Migrating,
initialize: false,
disable_v1: can_update,
})
}
}
@@ -2166,19 +2136,14 @@ impl DatadirModification<'_> {
img: Bytes,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
let v2_mode = self
.maybe_enable_rel_size_v2(false)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
let mut dbdir = DbDirectory::des(&buf)?;
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
.context("failed to deserialize rel dir migration history")
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
let v2_mode = self
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
if r.is_none() || r == Some(false) {
// The dbdir entry didn't exist, or it contained a
@@ -2325,6 +2290,15 @@ impl DatadirModification<'_> {
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
tracing::info!(
"migrated rel_size_v2: {}",
RelTag {
spcnode,
dbnode,
relnode,
forknum
}
);
rel_cnt += 1;
}
}
@@ -2333,6 +2307,9 @@ impl DatadirModification<'_> {
self.lsn,
rel_cnt
);
self.tline
.update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
Ok::<_, WalIngestError>(())
}
@@ -2415,32 +2392,25 @@ impl DatadirModification<'_> {
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
let mut reldir_migration_history =
RelDirMigrationHistory::from_bytes(reldir_migration_history)
.context("failed to deserialize rel dir migration history")
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
let mut is_dbdir_dirty = false;
let mut is_reldirv2_status_dirty = false;
let dbdir_exists =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
));
is_dbdir_dirty = true;
self.put(DBDIR_KEY, Value::Image(buf.into()));
false
} else {
true
};
let mut v2_mode = self
.maybe_enable_rel_size_v2(&reldir_migration_history, true)
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
.maybe_enable_rel_size_v2(true)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
if v2_mode.initialize {
if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
@@ -2448,40 +2418,8 @@ impl DatadirModification<'_> {
// TODO: circuit breaker so that it won't retry forever
} else {
v2_mode.current_status = RelSizeMigration::Migrating;
reldir_migration_history.status = RelSizeMigration::Migrating;
reldir_migration_history.v2_enabled_at = Some(self.lsn);
is_reldirv2_status_dirty = true;
}
}
if v2_mode.disable_v1 {
v2_mode.current_status = RelSizeMigration::Migrated;
reldir_migration_history.status = RelSizeMigration::Migrated;
reldir_migration_history.v1_disabled_at = Some(self.lsn);
is_reldirv2_status_dirty = true;
}
if is_dbdir_dirty {
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
if is_reldirv2_status_dirty {
self.tline
.update_rel_size_v2_status(
reldir_migration_history.status.clone(),
reldir_migration_history.v2_enabled_at,
)
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
self.put(
REL_DIR_MIGRATION_KEY,
Value::Image(
reldir_migration_history
.encode()
.context("failed to serialize rel dir migration history")
.map_err(WalIngestErrorKind::RelSizeV2Error)?,
),
);
}
if v2_mode.current_status != RelSizeMigration::Migrated {
self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
@@ -2642,13 +2580,9 @@ impl DatadirModification<'_> {
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
.context("failed to deserialize rel dir migration history")
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
let v2_mode = self
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
.maybe_enable_rel_size_v2(false)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
match v2_mode.current_status {
RelSizeMigration::Legacy => {
self.put_rel_drop_v1(drop_relations, ctx).await?;
@@ -2666,12 +2600,6 @@ impl DatadirModification<'_> {
);
}
}
Err(WalIngestError {
kind: WalIngestErrorKind::Cancelled,
..
}) => {
// Cancellation errors are fine to ignore, do not log.
}
Err(e) => {
tracing::warn!("error dropping rels: {}", e);
}
@@ -3221,32 +3149,6 @@ impl Version<'_> {
//--- Metadata structs stored in key-value pairs in the repository.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub(crate) struct RelDirMigrationHistory {
pub(crate) status: RelSizeMigration,
pub(crate) v2_enabled_at: Option<Lsn>,
pub(crate) v1_disabled_at: Option<Lsn>,
}
impl RelDirMigrationHistory {
pub(crate) fn from_bytes(bytes: Option<Bytes>) -> Result<Self, serde_json::Error> {
match bytes {
Some(bytes) => {
if bytes.is_empty() {
return Ok(Self::default());
}
let history = serde_json::from_slice(&bytes)?;
Ok(history)
}
None => Ok(Self::default()),
}
}
pub(crate) fn encode(&self) -> Result<Bytes, serde_json::Error> {
serde_json::to_vec(self).map(Bytes::from)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DbDirectory {
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)

View File

@@ -5090,8 +5090,7 @@ impl TenantShard {
src_timeline.pg_version,
);
let (rel_size_v2_status, rel_size_migrated_at) =
src_timeline.get_rel_size_v2_cached_status();
let (rel_size_v2_status, rel_size_migrated_at) = src_timeline.get_rel_size_v2_status();
let (uninitialized_timeline, _timeline_ctx) = self
.prepare_new_timeline(
dst_id,

View File

@@ -83,7 +83,6 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) last_aux_file_policy: Option<AuxFilePolicy>,
/// Deprecated: the field is not used anymore and the source of truth is now stored in the dbdir key.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) rel_size_migration: Option<RelSizeMigration>,
@@ -116,7 +115,8 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
/// Deprecated: the field is not used anymore and the source of truth is now stored in the dbdir key.
/// The LSN at which we started the rel size migration. Accesses below this LSN should be
/// processed with the v1 read path. Usually this LSN should be set together with `rel_size_migration`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) rel_size_migrated_at: Option<Lsn>,
}

View File

@@ -441,7 +441,7 @@ pub struct Timeline {
/// heatmap on demand.
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
pub(crate) rel_size_v2_cached_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
pub(crate) rel_size_v2_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
wait_lsn_log_slow: tokio::sync::Semaphore,
@@ -2883,33 +2883,19 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
/// Returns the expected state of the rel size migration. The actual state is persisted in the
/// DbDir key.
///
/// The expected state is the state that the tenant config expects.
pub(crate) fn get_rel_size_v2_expected_state(&self) -> RelSizeMigration {
/// Returns `true` if the rel_size_v2 config is enabled. NOTE: the write path and read path
/// should look at `get_rel_size_v2_status()` to get the actual status of the timeline. It is
/// possible that the index part persists the state while the config doesn't get persisted.
pub(crate) fn get_rel_size_v2_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
let v2_enabled = tenant_conf
tenant_conf
.tenant_conf
.rel_size_v2_enabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled);
let v1_access_disabled = tenant_conf
.tenant_conf
.rel_size_v1_access_disabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v1_access_disabled);
match (v2_enabled, v1_access_disabled) {
(true, false) => RelSizeMigration::Migrating,
(true, true) => RelSizeMigration::Migrated,
(false, true) => RelSizeMigration::Legacy, // This should never happen
(false, false) => RelSizeMigration::Legacy,
}
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
/// DO NOT use this API in the read/write path to determine the rel size migration status. The source of truth is the dbdir key.
/// This API is only used for the timeline info struct to get the latest cached status.
pub(crate) fn get_rel_size_v2_cached_status(&self) -> (RelSizeMigration, Option<Lsn>) {
let (status, migrated_at) = self.rel_size_v2_cached_status.load().as_ref().clone();
pub(crate) fn get_rel_size_v2_status(&self) -> (RelSizeMigration, Option<Lsn>) {
let (status, migrated_at) = self.rel_size_v2_status.load().as_ref().clone();
(status.unwrap_or(RelSizeMigration::Legacy), migrated_at)
}
@@ -3350,7 +3336,7 @@ impl Timeline {
heatmap_layers_downloader: Mutex::new(None),
rel_size_v2_cached_status: ArcSwap::from_pointee((
rel_size_v2_status: ArcSwap::from_pointee((
rel_size_v2_status,
rel_size_migrated_at,
)),
@@ -3443,11 +3429,10 @@ impl Timeline {
rel_size_v2_status: RelSizeMigration,
rel_size_migrated_at: Option<Lsn>,
) -> anyhow::Result<()> {
self.rel_size_v2_cached_status.store(Arc::new((
self.rel_size_v2_status.store(Arc::new((
Some(rel_size_v2_status.clone()),
rel_size_migrated_at,
)));
// The index_part upload is not used as source of truth anymore, but we still need to upload it to make it work across branches.
self.remote_client
.schedule_index_upload_for_rel_size_v2_status_update(
rel_size_v2_status,

View File

@@ -135,7 +135,7 @@ pub enum WalIngestErrorKind {
#[error(transparent)]
EncodeAuxFileError(anyhow::Error),
#[error(transparent)]
RelSizeV2Error(anyhow::Error),
MaybeRelSizeV2Error(anyhow::Error),
#[error("timeline shutting down")]
Cancelled,

View File

@@ -79,6 +79,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)

View File

@@ -635,11 +635,6 @@ lfc_init(void)
NULL);
}
/*
* Dump a list of pages that are currently in the LFC
*
* This is used to get a snapshot that can be used to prewarm the LFC later.
*/
FileCacheState*
lfc_get_state(size_t max_entries)
{
@@ -2272,3 +2267,4 @@ get_prewarm_info(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Main entry point into the neon extension
* Main entry point into the neon exension
*
*-------------------------------------------------------------------------
*/
@@ -508,7 +508,7 @@ _PG_init(void)
DefineCustomBoolVariable(
"neon.disable_logical_replication_subscribers",
"Disable incoming logical replication",
"Disables incomming logical replication",
NULL,
&disable_logical_replication_subscribers,
false,
@@ -567,7 +567,7 @@ _PG_init(void)
DefineCustomEnumVariable(
"neon.debug_compare_local",
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
NULL,
&debug_compare_local,
DEBUG_COMPARE_LOCAL_NONE,
@@ -735,6 +735,7 @@ neon_shmem_request_hook(void)
static void
neon_shmem_startup_hook(void)
{
/* Initialize */
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

View File

@@ -167,7 +167,11 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
*/
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
#if PG_VERSION_NUM >= 170000
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
#else
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
#endif
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);

View File

@@ -9,10 +9,6 @@
#include "fmgr.h"
#include "storage/buf_internals.h"
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#if PG_MAJORVERSION_NUM < 17
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
#else
@@ -162,10 +158,6 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
#endif
#if PG_MAJORVERSION_NUM < 17
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
#endif
#if PG_MAJORVERSION_NUM < 15
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);

View File

@@ -72,6 +72,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#include "access/nbtree.h"
#include "storage/bufpage.h"
#include "access/xlog_internal.h"

View File

@@ -13,7 +13,6 @@
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include RELFILEINFO_HDR
#include "storage/smgr.h"
@@ -24,6 +23,10 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "miscadmin.h"
#endif
typedef struct
{
NRelFileInfo rinfo;

View File

@@ -1312,8 +1312,8 @@ class NeonEnv:
)
tenant_config = ps_cfg.setdefault("tenant_config", {})
# Enable relsize_v2 by default in tests.
tenant_config["rel_size_v2_enabled"] = True
# This feature is pending rollout.
# tenant_config["rel_size_v2_enabled"] = True
# Test authors tend to forget about the default 10min initial lease deadline
# when writing tests, which turns their immediate gc requests via mgmt API

View File

@@ -80,12 +80,7 @@ def test_perf_simple_many_relations_reldir(
"""
Test creating many relations in a single database.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
"rel_size_v2_enabled": reldir != "v1",
"rel_size_v1_access_disabled": reldir == "v2",
}
)
env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": reldir != "v1"})
ep = env.endpoints.create_start(
"main",
config_lines=[
@@ -113,6 +108,10 @@ def test_perf_simple_many_relations_reldir(
== "migrating"
)
elif reldir == "v2":
# only read/write to the v2 keyspace
env.pageserver.http_client().timeline_patch_index_part(
env.initial_tenant, env.initial_timeline, {"rel_size_migration": "migrated"}
)
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"

View File

@@ -183,7 +183,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"lsn_lease_length": "1m",
"lsn_lease_length_for_ts": "5s",
"timeline_offloading": False,
"rel_size_v2_enabled": False,
"rel_size_v2_enabled": True,
"relsize_snapshot_cache_capacity": 10000,
"gc_compaction_enabled": False,
"gc_compaction_verification": False,
@@ -194,7 +194,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
"numerator": 0,
"denominator": 10,
},
"rel_size_v1_access_disabled": True,
}
vps_http = env.storage_controller.pageserver_api()

View File

@@ -581,10 +581,12 @@ def test_historic_storage_formats(
# This dataset was created at a time where we decided to migrate to v2 reldir by simply disabling writes to v1
# and starting writing to v2. This was too risky and we have reworked the migration plan. Therefore, we should
# opt in full relv2 mode for this dataset.
env.pageserver.http_client().patch_tenant_config(
dataset.tenant_id,
{"rel_size_v1_access_disabled": True, "rel_size_v2_enabled": True},
)
for timeline in timelines:
env.pageserver.http_client().timeline_patch_index_part(
dataset.tenant_id,
timeline["timeline_id"],
{"force_index_update": True, "rel_size_migration": "migrated"},
)
# Import tenant does not create the timeline on safekeepers,
# because it is a debug handler and the timeline may have already been

View File

@@ -123,10 +123,9 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
def patch_tenant_conf(tenant_conf: dict[str, Any], reldir_type: str) -> dict[str, Any]:
tenant_conf = tenant_conf.copy()
if reldir_type == "v2":
tenant_conf["rel_size_v2_enabled"] = True
tenant_conf["rel_size_v1_access_disabled"] = True
tenant_conf["rel_size_v2_enabled"] = "true"
else:
tenant_conf["rel_size_v2_enabled"] = False
tenant_conf["rel_size_v2_enabled"] = "false"
return tenant_conf

View File

@@ -298,26 +298,15 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
def test_pageserver_metrics_removed_after_offload(
neon_env_builder: NeonEnvBuilder, compaction: str
):
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant(
conf={
# disable background compaction and GC so that we don't have leftover tasks
# after offloading.
"gc_period": "0s",
"compaction_period": "0s",
}
if compaction == "compaction_disabled"
else None
)
tenant_1, _ = env.create_tenant()
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
@@ -362,23 +351,6 @@ def test_pageserver_metrics_removed_after_offload(
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
# We need to wait until all background jobs are finished before we can check the metrics.
# There're many of them: compaction, GC, etc.
wait_until(
lambda: all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
)
and all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_running_tasks")
)
)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)