mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
Compare commits
23 Commits
skyzh/try-
...
min_inflig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fe7f8802a | ||
|
|
7841303df9 | ||
|
|
0e9d3fd2b1 | ||
|
|
b2a87b501f | ||
|
|
06417f2ff9 | ||
|
|
252876515c | ||
|
|
d56a72afec | ||
|
|
3847ab73a7 | ||
|
|
3e5bbe7027 | ||
|
|
546a45f57a | ||
|
|
588cb289d5 | ||
|
|
b947deb07c | ||
|
|
2f455baa73 | ||
|
|
f2e65e1d2c | ||
|
|
df49def453 | ||
|
|
1ef0f71a95 | ||
|
|
96df649858 | ||
|
|
9bfba1b087 | ||
|
|
b41b85f8ec | ||
|
|
32b801ea1c | ||
|
|
89496a32d0 | ||
|
|
9617b8d328 | ||
|
|
a504516b8c |
@@ -571,11 +571,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'basebackup_cache_enabled' as bool")?,
|
||||
rel_size_v1_access_disabled: settings
|
||||
.remove("rel_size_v1_access_disabled")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'rel_size_v1_access_disabled' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -651,9 +651,6 @@ pub struct TenantConfigToml {
|
||||
// FIXME: Remove skip_serializing_if when the feature is stable.
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub basebackup_cache_enabled: bool,
|
||||
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub rel_size_v1_access_disabled: bool,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -962,7 +959,6 @@ impl Default for TenantConfigToml {
|
||||
sampling_ratio: None,
|
||||
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
|
||||
basebackup_cache_enabled: false,
|
||||
rel_size_v1_access_disabled: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,15 +519,6 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
pub const REL_DIR_MIGRATION_KEY: Key = Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
};
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
|
||||
Key {
|
||||
|
||||
@@ -646,8 +646,6 @@ pub struct TenantConfigPatch {
|
||||
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub basebackup_cache_enabled: FieldPatch<bool>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub rel_size_v1_access_disabled: FieldPatch<bool>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -785,9 +783,6 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_enabled: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub rel_size_v1_access_disabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -835,7 +830,6 @@ impl TenantConfig {
|
||||
mut sampling_ratio,
|
||||
mut relsize_snapshot_cache_capacity,
|
||||
mut basebackup_cache_enabled,
|
||||
mut rel_size_v1_access_disabled,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -945,9 +939,6 @@ impl TenantConfig {
|
||||
patch
|
||||
.basebackup_cache_enabled
|
||||
.apply(&mut basebackup_cache_enabled);
|
||||
patch
|
||||
.rel_size_v1_access_disabled
|
||||
.apply(&mut rel_size_v1_access_disabled);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -989,7 +980,6 @@ impl TenantConfig {
|
||||
sampling_ratio,
|
||||
relsize_snapshot_cache_capacity,
|
||||
basebackup_cache_enabled,
|
||||
rel_size_v1_access_disabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1104,9 +1094,6 @@ impl TenantConfig {
|
||||
basebackup_cache_enabled: self
|
||||
.basebackup_cache_enabled
|
||||
.unwrap_or(global_conf.basebackup_cache_enabled),
|
||||
rel_size_v1_access_disabled: self
|
||||
.rel_size_v1_access_disabled
|
||||
.unwrap_or(global_conf.rel_size_v1_access_disabled),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1539,15 +1526,12 @@ pub struct OffloadedTimelineInfo {
|
||||
pub archived_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
/// The state of the rel size migration. This is persisted in the DbDir key and index part. Do not change without considering
|
||||
/// compatibility.
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum RelSizeMigration {
|
||||
/// The tenant is using the old rel_size format.
|
||||
/// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
|
||||
/// `None` is the same as `Some(RelSizeMigration::Legacy)`.
|
||||
#[default]
|
||||
Legacy,
|
||||
/// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
|
||||
/// persisted in the storage. The read path will read both formats and validate them.
|
||||
|
||||
@@ -484,7 +484,7 @@ async fn build_timeline_info_common(
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
);
|
||||
|
||||
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_cached_status();
|
||||
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_status();
|
||||
|
||||
let info = TimelineInfo {
|
||||
tenant_id: timeline.tenant_shard_id,
|
||||
|
||||
@@ -16,10 +16,10 @@ use anyhow::Context;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key,
|
||||
REL_DIR_MIGRATION_KEY, RelDirExists, TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key,
|
||||
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
||||
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
|
||||
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
};
|
||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
@@ -685,16 +685,12 @@ impl Timeline {
|
||||
// then check if the database was already initialized.
|
||||
// get_rel_exists can be called before dbdir is created.
|
||||
let buf = version.get(self, DBDIR_KEY, ctx).await?;
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
let dbdirs = &dbdir.dbdirs;
|
||||
let dbdirs = DbDirectory::des(&buf)?.dbdirs;
|
||||
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
|
||||
let migration_history = RelDirMigrationHistory::from_bytes(migration_history)
|
||||
.context("failed to deserialize rel dir migration history")?;
|
||||
let v2_status = migration_history.status;
|
||||
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
|
||||
|
||||
match v2_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
@@ -703,6 +699,15 @@ impl Timeline {
|
||||
.await?;
|
||||
Ok(v1_exists)
|
||||
}
|
||||
RelSizeMigration::Migrating | RelSizeMigration::Migrated
|
||||
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
|
||||
{
|
||||
// For requests below the migrated LSN, we still use the v1 read path.
|
||||
let v1_exists = self
|
||||
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?;
|
||||
Ok(v1_exists)
|
||||
}
|
||||
RelSizeMigration::Migrating => {
|
||||
let v1_exists = self
|
||||
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
|
||||
@@ -715,12 +720,9 @@ impl Timeline {
|
||||
"inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
|
||||
tag,
|
||||
v1_exists,
|
||||
v2_exists,
|
||||
v2_exists
|
||||
);
|
||||
}
|
||||
Err(e) if e.is_cancel() => {
|
||||
// Cancellation errors are fine to ignore, do not log.
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to get rel exists in v2: {e}");
|
||||
}
|
||||
@@ -778,10 +780,6 @@ impl Timeline {
|
||||
.await?;
|
||||
let mut rels = HashSet::new();
|
||||
for (key, val) in results {
|
||||
if key == REL_DIR_MIGRATION_KEY {
|
||||
// The key that determines the current migration status, skip it.
|
||||
continue;
|
||||
}
|
||||
let val = RelDirExists::decode(&val?).map_err(|_| {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: decode failed, {}",
|
||||
@@ -813,11 +811,11 @@ impl Timeline {
|
||||
forknum: key.field5,
|
||||
};
|
||||
if val == RelDirExists::Removed {
|
||||
debug_assert!(!rels.contains(&tag), "removed reltag in v2: {tag}");
|
||||
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
|
||||
continue;
|
||||
}
|
||||
let did_not_contain = rels.insert(tag);
|
||||
debug_assert!(did_not_contain, "duplicate reltag in v2: {tag}");
|
||||
debug_assert!(did_not_contain, "duplicate reltag in v2");
|
||||
}
|
||||
Ok(rels)
|
||||
}
|
||||
@@ -837,16 +835,20 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
let reldir_migration_history = version.sparse_get(self, REL_DIR_MIGRATION_KEY, ctx).await?;
|
||||
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
||||
.context("failed to deserialize rel dir migration history")?;
|
||||
let v2_status = reldir_migration_history.status;
|
||||
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
|
||||
|
||||
match v2_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
Ok(rels_v1)
|
||||
}
|
||||
RelSizeMigration::Migrating | RelSizeMigration::Migrated
|
||||
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
|
||||
{
|
||||
// For requests below the migrated LSN, we still use the v1 read path.
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
Ok(rels_v1)
|
||||
}
|
||||
RelSizeMigration::Migrating => {
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
|
||||
@@ -861,9 +863,6 @@ impl Timeline {
|
||||
rels_v2.len()
|
||||
);
|
||||
}
|
||||
Err(e) if e.is_cancel() => {
|
||||
// Cancellation errors are fine to ignore, do not log.
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to list rels in v2: {e}");
|
||||
}
|
||||
@@ -1725,8 +1724,6 @@ pub struct RelDirMode {
|
||||
current_status: RelSizeMigration,
|
||||
// Whether we should initialize the v2 keyspace or not.
|
||||
initialize: bool,
|
||||
// Whether we should disable v1 access starting this LSNor not
|
||||
disable_v1: bool,
|
||||
}
|
||||
|
||||
impl DatadirModification<'_> {
|
||||
@@ -2088,71 +2085,44 @@ impl DatadirModification<'_> {
|
||||
///
|
||||
/// As this function is only used on the write path, we do not need to read the migrated_at
|
||||
/// field.
|
||||
pub(crate) fn maybe_enable_rel_size_v2(
|
||||
&mut self,
|
||||
migration_history: &RelDirMigrationHistory,
|
||||
is_create: bool,
|
||||
) -> anyhow::Result<RelDirMode> {
|
||||
pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result<RelDirMode> {
|
||||
// TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
|
||||
let expected_status = self.tline.get_rel_size_v2_expected_state();
|
||||
let persistent_status = migration_history.status.clone();
|
||||
|
||||
// Only initialize the v2 keyspace on new relation creation. No initialization
|
||||
// during `timeline_create` (TODO: fix this, we should allow, but currently it
|
||||
// hits consistency issues).
|
||||
let can_update = is_create && !self.is_importing_pgdata;
|
||||
|
||||
match (expected_status, persistent_status) {
|
||||
(RelSizeMigration::Legacy, RelSizeMigration::Legacy) => {
|
||||
let (status, _) = self.tline.get_rel_size_v2_status();
|
||||
let config = self.tline.get_rel_size_v2_enabled();
|
||||
match (config, status) {
|
||||
(false, RelSizeMigration::Legacy) => {
|
||||
// tenant config didn't enable it and we didn't write any reldir_v2 key yet
|
||||
Ok(RelDirMode {
|
||||
current_status: RelSizeMigration::Legacy,
|
||||
initialize: false,
|
||||
disable_v1: false,
|
||||
})
|
||||
}
|
||||
(
|
||||
RelSizeMigration::Legacy,
|
||||
current_status @ RelSizeMigration::Migrating
|
||||
| current_status @ RelSizeMigration::Migrated,
|
||||
) => {
|
||||
// already persisted that the timeline has enabled rel_size_v2, cannot rollback
|
||||
(false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
|
||||
// index_part already persisted that the timeline has enabled rel_size_v2
|
||||
Ok(RelDirMode {
|
||||
current_status,
|
||||
current_status: status,
|
||||
initialize: false,
|
||||
disable_v1: false,
|
||||
})
|
||||
}
|
||||
(
|
||||
expected_status @ RelSizeMigration::Migrating
|
||||
| expected_status @ RelSizeMigration::Migrated,
|
||||
RelSizeMigration::Legacy,
|
||||
) => {
|
||||
(true, RelSizeMigration::Legacy) => {
|
||||
// The first time we enable it, we need to persist it in `index_part.json`
|
||||
// The caller should update the reldir status once the initialization is done.
|
||||
|
||||
//
|
||||
// Only initialize the v2 keyspace on new relation creation. No initialization
|
||||
// during `timeline_create` (TODO: fix this, we should allow, but currently it
|
||||
// hits consistency issues).
|
||||
Ok(RelDirMode {
|
||||
current_status: RelSizeMigration::Legacy,
|
||||
initialize: can_update,
|
||||
disable_v1: can_update && expected_status == RelSizeMigration::Migrated,
|
||||
initialize: is_create && !self.is_importing_pgdata,
|
||||
})
|
||||
}
|
||||
(RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrating)
|
||||
| (RelSizeMigration::Migrated, current_status @ RelSizeMigration::Migrated)
|
||||
| (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrated) => {
|
||||
// Keep the current state
|
||||
(true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
|
||||
// index_part already persisted that the timeline has enabled rel_size_v2
|
||||
// and we don't need to do anything
|
||||
Ok(RelDirMode {
|
||||
current_status,
|
||||
current_status: status,
|
||||
initialize: false,
|
||||
disable_v1: false,
|
||||
})
|
||||
}
|
||||
(RelSizeMigration::Migrated, RelSizeMigration::Migrating) => {
|
||||
// Switch to v2-only mode
|
||||
Ok(RelDirMode {
|
||||
current_status: RelSizeMigration::Migrating,
|
||||
initialize: false,
|
||||
disable_v1: can_update,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2166,19 +2136,14 @@ impl DatadirModification<'_> {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_mode = self
|
||||
.maybe_enable_rel_size_v2(false)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = self.get(DBDIR_KEY, ctx).await?;
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
||||
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
||||
.context("failed to deserialize rel dir migration history")
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
|
||||
let v2_mode = self
|
||||
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
|
||||
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
||||
if r.is_none() || r == Some(false) {
|
||||
// The dbdir entry didn't exist, or it contained a
|
||||
@@ -2325,6 +2290,15 @@ impl DatadirModification<'_> {
|
||||
sparse_rel_dir_key,
|
||||
Value::Image(RelDirExists::Exists.encode()),
|
||||
);
|
||||
tracing::info!(
|
||||
"migrated rel_size_v2: {}",
|
||||
RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode,
|
||||
forknum
|
||||
}
|
||||
);
|
||||
rel_cnt += 1;
|
||||
}
|
||||
}
|
||||
@@ -2333,6 +2307,9 @@ impl DatadirModification<'_> {
|
||||
self.lsn,
|
||||
rel_cnt
|
||||
);
|
||||
self.tline
|
||||
.update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
Ok::<_, WalIngestError>(())
|
||||
}
|
||||
|
||||
@@ -2415,32 +2392,25 @@ impl DatadirModification<'_> {
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
||||
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
||||
let mut reldir_migration_history =
|
||||
RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
||||
.context("failed to deserialize rel dir migration history")
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
|
||||
let mut is_dbdir_dirty = false;
|
||||
let mut is_reldirv2_status_dirty = false;
|
||||
|
||||
let dbdir_exists =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::Db,
|
||||
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
|
||||
));
|
||||
is_dbdir_dirty = true;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
false
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let mut v2_mode = self
|
||||
.maybe_enable_rel_size_v2(&reldir_migration_history, true)
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
.maybe_enable_rel_size_v2(true)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
if v2_mode.initialize {
|
||||
if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
|
||||
@@ -2448,40 +2418,8 @@ impl DatadirModification<'_> {
|
||||
// TODO: circuit breaker so that it won't retry forever
|
||||
} else {
|
||||
v2_mode.current_status = RelSizeMigration::Migrating;
|
||||
reldir_migration_history.status = RelSizeMigration::Migrating;
|
||||
reldir_migration_history.v2_enabled_at = Some(self.lsn);
|
||||
is_reldirv2_status_dirty = true;
|
||||
}
|
||||
}
|
||||
if v2_mode.disable_v1 {
|
||||
v2_mode.current_status = RelSizeMigration::Migrated;
|
||||
reldir_migration_history.status = RelSizeMigration::Migrated;
|
||||
reldir_migration_history.v1_disabled_at = Some(self.lsn);
|
||||
is_reldirv2_status_dirty = true;
|
||||
}
|
||||
|
||||
if is_dbdir_dirty {
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
}
|
||||
|
||||
if is_reldirv2_status_dirty {
|
||||
self.tline
|
||||
.update_rel_size_v2_status(
|
||||
reldir_migration_history.status.clone(),
|
||||
reldir_migration_history.v2_enabled_at,
|
||||
)
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
self.put(
|
||||
REL_DIR_MIGRATION_KEY,
|
||||
Value::Image(
|
||||
reldir_migration_history
|
||||
.encode()
|
||||
.context("failed to serialize rel dir migration history")
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if v2_mode.current_status != RelSizeMigration::Migrated {
|
||||
self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
|
||||
@@ -2642,13 +2580,9 @@ impl DatadirModification<'_> {
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
let reldir_migration_history = self.sparse_get(REL_DIR_MIGRATION_KEY, ctx).await?;
|
||||
let reldir_migration_history = RelDirMigrationHistory::from_bytes(reldir_migration_history)
|
||||
.context("failed to deserialize rel dir migration history")
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
let v2_mode = self
|
||||
.maybe_enable_rel_size_v2(&reldir_migration_history, false)
|
||||
.map_err(WalIngestErrorKind::RelSizeV2Error)?;
|
||||
.maybe_enable_rel_size_v2(false)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
match v2_mode.current_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
self.put_rel_drop_v1(drop_relations, ctx).await?;
|
||||
@@ -2666,12 +2600,6 @@ impl DatadirModification<'_> {
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(WalIngestError {
|
||||
kind: WalIngestErrorKind::Cancelled,
|
||||
..
|
||||
}) => {
|
||||
// Cancellation errors are fine to ignore, do not log.
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("error dropping rels: {}", e);
|
||||
}
|
||||
@@ -3221,32 +3149,6 @@ impl Version<'_> {
|
||||
|
||||
//--- Metadata structs stored in key-value pairs in the repository.
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub(crate) struct RelDirMigrationHistory {
|
||||
pub(crate) status: RelSizeMigration,
|
||||
pub(crate) v2_enabled_at: Option<Lsn>,
|
||||
pub(crate) v1_disabled_at: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl RelDirMigrationHistory {
|
||||
pub(crate) fn from_bytes(bytes: Option<Bytes>) -> Result<Self, serde_json::Error> {
|
||||
match bytes {
|
||||
Some(bytes) => {
|
||||
if bytes.is_empty() {
|
||||
return Ok(Self::default());
|
||||
}
|
||||
let history = serde_json::from_slice(&bytes)?;
|
||||
Ok(history)
|
||||
}
|
||||
None => Ok(Self::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn encode(&self) -> Result<Bytes, serde_json::Error> {
|
||||
serde_json::to_vec(self).map(Bytes::from)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct DbDirectory {
|
||||
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
|
||||
|
||||
@@ -5090,8 +5090,7 @@ impl TenantShard {
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let (rel_size_v2_status, rel_size_migrated_at) =
|
||||
src_timeline.get_rel_size_v2_cached_status();
|
||||
let (rel_size_v2_status, rel_size_migrated_at) = src_timeline.get_rel_size_v2_status();
|
||||
let (uninitialized_timeline, _timeline_ctx) = self
|
||||
.prepare_new_timeline(
|
||||
dst_id,
|
||||
|
||||
@@ -83,7 +83,6 @@ pub struct IndexPart {
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
|
||||
/// Deprecated: the field is not used anymore and the source of truth is now stored in the dbdir key.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) rel_size_migration: Option<RelSizeMigration>,
|
||||
|
||||
@@ -116,7 +115,8 @@ pub struct IndexPart {
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
|
||||
|
||||
/// Deprecated: the field is not used anymore and the source of truth is now stored in the dbdir key.
|
||||
/// The LSN at which we started the rel size migration. Accesses below this LSN should be
|
||||
/// processed with the v1 read path. Usually this LSN should be set together with `rel_size_migration`.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) rel_size_migrated_at: Option<Lsn>,
|
||||
}
|
||||
|
||||
@@ -441,7 +441,7 @@ pub struct Timeline {
|
||||
/// heatmap on demand.
|
||||
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
|
||||
|
||||
pub(crate) rel_size_v2_cached_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
|
||||
pub(crate) rel_size_v2_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore,
|
||||
|
||||
@@ -2883,33 +2883,19 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
/// Returns the expected state of the rel size migration. The actual state is persisted in the
|
||||
/// DbDir key.
|
||||
///
|
||||
/// The expected state is the state that the tenant config expects.
|
||||
pub(crate) fn get_rel_size_v2_expected_state(&self) -> RelSizeMigration {
|
||||
/// Returns `true` if the rel_size_v2 config is enabled. NOTE: the write path and read path
|
||||
/// should look at `get_rel_size_v2_status()` to get the actual status of the timeline. It is
|
||||
/// possible that the index part persists the state while the config doesn't get persisted.
|
||||
pub(crate) fn get_rel_size_v2_enabled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let v2_enabled = tenant_conf
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.rel_size_v2_enabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled);
|
||||
let v1_access_disabled = tenant_conf
|
||||
.tenant_conf
|
||||
.rel_size_v1_access_disabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v1_access_disabled);
|
||||
|
||||
match (v2_enabled, v1_access_disabled) {
|
||||
(true, false) => RelSizeMigration::Migrating,
|
||||
(true, true) => RelSizeMigration::Migrated,
|
||||
(false, true) => RelSizeMigration::Legacy, // This should never happen
|
||||
(false, false) => RelSizeMigration::Legacy,
|
||||
}
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
|
||||
}
|
||||
|
||||
/// DO NOT use this API in the read/write path to determine the rel size migration status. The source of truth is the dbdir key.
|
||||
/// This API is only used for the timeline info struct to get the latest cached status.
|
||||
pub(crate) fn get_rel_size_v2_cached_status(&self) -> (RelSizeMigration, Option<Lsn>) {
|
||||
let (status, migrated_at) = self.rel_size_v2_cached_status.load().as_ref().clone();
|
||||
pub(crate) fn get_rel_size_v2_status(&self) -> (RelSizeMigration, Option<Lsn>) {
|
||||
let (status, migrated_at) = self.rel_size_v2_status.load().as_ref().clone();
|
||||
(status.unwrap_or(RelSizeMigration::Legacy), migrated_at)
|
||||
}
|
||||
|
||||
@@ -3350,7 +3336,7 @@ impl Timeline {
|
||||
|
||||
heatmap_layers_downloader: Mutex::new(None),
|
||||
|
||||
rel_size_v2_cached_status: ArcSwap::from_pointee((
|
||||
rel_size_v2_status: ArcSwap::from_pointee((
|
||||
rel_size_v2_status,
|
||||
rel_size_migrated_at,
|
||||
)),
|
||||
@@ -3443,11 +3429,10 @@ impl Timeline {
|
||||
rel_size_v2_status: RelSizeMigration,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
self.rel_size_v2_cached_status.store(Arc::new((
|
||||
self.rel_size_v2_status.store(Arc::new((
|
||||
Some(rel_size_v2_status.clone()),
|
||||
rel_size_migrated_at,
|
||||
)));
|
||||
// The index_part upload is not used as source of truth anymore, but we still need to upload it to make it work across branches.
|
||||
self.remote_client
|
||||
.schedule_index_upload_for_rel_size_v2_status_update(
|
||||
rel_size_v2_status,
|
||||
|
||||
@@ -135,7 +135,7 @@ pub enum WalIngestErrorKind {
|
||||
#[error(transparent)]
|
||||
EncodeAuxFileError(anyhow::Error),
|
||||
#[error(transparent)]
|
||||
RelSizeV2Error(anyhow::Error),
|
||||
MaybeRelSizeV2Error(anyhow::Error),
|
||||
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
@@ -48,6 +48,8 @@ DATA = \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.6.sql \
|
||||
neon--1.6--1.7.sql \
|
||||
neon--1.7--1.6.sql \
|
||||
neon--1.6--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
|
||||
@@ -260,7 +260,7 @@ typedef struct PrefetchState
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
int max_shard_no;
|
||||
int max_unflushed_shard_no;
|
||||
/* Mark shards involved in prefetch */
|
||||
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
@@ -300,6 +300,7 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
static bool prefetch_flush_requests(void);
|
||||
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
@@ -469,13 +470,26 @@ communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
|
||||
{
|
||||
/*
|
||||
* Flush request to avoid requests pending for arbitrary long time,
|
||||
* pinning LSN and holding GC at PS.
|
||||
*/
|
||||
if (!prefetch_flush_requests())
|
||||
{
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
return;
|
||||
}
|
||||
}
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
PrefetchRequest *slot;
|
||||
MemoryContext old;
|
||||
uint64 my_ring_index = MyPState->ring_receive;
|
||||
|
||||
slot = GetPrfSlot(MyPState->ring_receive);
|
||||
slot = GetPrfSlot(my_ring_index);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = page_server->try_receive(slot->shard_no);
|
||||
@@ -489,12 +503,12 @@ communicator_prefetch_pump_state(void)
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
slot->my_ring_index != my_ring_index)
|
||||
{
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
||||
slot->status, slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
slot->my_ring_index, my_ring_index);
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -522,6 +536,19 @@ communicator_prefetch_pump_state(void)
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* Update backend's min in-flight prefetch LSN.
|
||||
*/
|
||||
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
|
||||
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
|
||||
{
|
||||
PrefetchRequest* slot = GetPrfSlot(ring_index);
|
||||
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
|
||||
}
|
||||
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
|
||||
}
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
|
||||
@@ -561,7 +588,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
newPState->ring_last = newsize;
|
||||
newPState->ring_unused = newsize;
|
||||
newPState->ring_receive = newsize;
|
||||
newPState->max_shard_no = MyPState->max_shard_no;
|
||||
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
|
||||
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
|
||||
|
||||
/*
|
||||
@@ -661,6 +688,7 @@ consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
|
||||
/*
|
||||
* We know for sure we're not working on any prefetch pages after
|
||||
* this.
|
||||
@@ -690,7 +718,7 @@ prefetch_cleanup_trailing_unused(void)
|
||||
static bool
|
||||
prefetch_flush_requests(void)
|
||||
{
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
|
||||
{
|
||||
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
|
||||
{
|
||||
@@ -699,7 +727,8 @@ prefetch_flush_requests(void)
|
||||
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
|
||||
}
|
||||
}
|
||||
MyPState->max_shard_no = 0;
|
||||
MyPState->max_unflushed_shard_no = 0;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -723,7 +752,6 @@ prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
if (!prefetch_flush_requests())
|
||||
return false;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
@@ -802,6 +830,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
if (response)
|
||||
{
|
||||
check_getpage_response(slot, response);
|
||||
@@ -1010,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
|
||||
if (force_request_lsns)
|
||||
{
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
}
|
||||
else
|
||||
{
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1);
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -1033,7 +1067,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
|
||||
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
||||
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
@@ -1041,6 +1075,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that returned page LSN is consistent with request lsns
|
||||
*/
|
||||
static void
|
||||
check_page_lsn(NeonGetPageResponse* resp)
|
||||
{
|
||||
if (neon_protocol_version < 3) /* no information to check */
|
||||
return;
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
||||
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
@@ -1064,7 +1117,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
NeonGetPageResponse* resp;
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
@@ -1097,8 +1150,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
continue;
|
||||
}
|
||||
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
|
||||
resp = (NeonGetPageResponse*)slot->response;
|
||||
check_page_lsn(resp);
|
||||
memcpy(buffers[i], resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
||||
@@ -1391,7 +1445,6 @@ Retry:
|
||||
*/
|
||||
goto Retry;
|
||||
}
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return last_ring_index;
|
||||
@@ -1461,10 +1514,12 @@ page_server_request(void const *req)
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
} while (resp == NULL);
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
page_server->disconnect(shard_no);
|
||||
@@ -1864,6 +1919,13 @@ nm_to_string(NeonMessage *msg)
|
||||
return s.data;
|
||||
}
|
||||
|
||||
static void
|
||||
reset_min_request_lsn(int code, Datum arg)
|
||||
{
|
||||
if (MyProcNumber != -1)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* communicator_init() -- Initialize per-backend private state
|
||||
*/
|
||||
@@ -1875,6 +1937,8 @@ communicator_init(void)
|
||||
if (MyPState != NULL)
|
||||
return;
|
||||
|
||||
before_shmem_exit(reset_min_request_lsn, 0);
|
||||
|
||||
/*
|
||||
* Sanity check that theperf counters array is sized correctly. We got
|
||||
* this wrong once, and the formula for max number of backends and aux
|
||||
@@ -1884,7 +1948,7 @@ communicator_init(void)
|
||||
* the check here. That's OK, we don't expect the logic to change in old
|
||||
* releases.
|
||||
*/
|
||||
#if PG_VERSION_NUM>=150000
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
||||
elog(ERROR, "MyNeonCounters points past end of array");
|
||||
#endif
|
||||
@@ -2223,6 +2287,7 @@ Retry:
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
check_page_lsn(getpage_resp);
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
@@ -2499,12 +2564,30 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
|
||||
readahead_getpage_pull_timeout_ms > 0;
|
||||
|
||||
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
|
||||
{
|
||||
if (last_replay_lsn == InvalidXLogRecPtr)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
else
|
||||
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
|
||||
}
|
||||
if (needs_set != timeout_set)
|
||||
{
|
||||
/* The background writer doens't (shouldn't) read any pages */
|
||||
Assert(!AmBackgroundWriterProcess());
|
||||
/* The checkpointer doens't (shouldn't) read any pages */
|
||||
Assert(!AmCheckpointerProcess());
|
||||
/*
|
||||
* The background writer/checkpointer doens't (shouldn't) read any pages.
|
||||
* And definitely they should not run on replica.
|
||||
* The only case when we can get here is replica promotion.
|
||||
*/
|
||||
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
|
||||
{
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
if (timeout_set)
|
||||
{
|
||||
disable_timeout(PS_TIMEOUT_ID, false);
|
||||
timeout_set = false;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(PS_TIMEOUT_ID == 0))
|
||||
{
|
||||
@@ -2537,14 +2620,6 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
static void
|
||||
pagestore_timeout_handler(void)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = true;
|
||||
InterruptPending = true;
|
||||
}
|
||||
@@ -2564,6 +2639,14 @@ communicator_processinterrupts(void)
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -2573,3 +2656,28 @@ communicator_processinterrupts(void)
|
||||
|
||||
return prev_interrupt_cb();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
||||
|
||||
Datum
|
||||
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/* Do not hold GC for primary */
|
||||
PG_RETURN_INT64(UINT64_MAX);
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
|
||||
size_t n_procs = ProcGlobal->allProcCount;
|
||||
for (size_t i = 0; i < n_procs; i++)
|
||||
{
|
||||
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
|
||||
{
|
||||
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
|
||||
}
|
||||
}
|
||||
PG_RETURN_INT64(min_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
|
||||
LANGUAGE C;
|
||||
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
@@ -0,0 +1 @@
|
||||
drop function neon_communicator_min_inflight_request_lsn();
|
||||
@@ -42,7 +42,6 @@ NeonPerfCountersShmemRequest(void)
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
NeonPerfCountersShmemInit(void)
|
||||
{
|
||||
|
||||
@@ -154,6 +154,11 @@ typedef struct
|
||||
* Histogram of query execution time.
|
||||
*/
|
||||
QTHistogramData query_time_hist;
|
||||
|
||||
/*
|
||||
* Minimal LSN of in-fligth request requests
|
||||
*/
|
||||
XLogRecPtr min_request_lsn;
|
||||
} neon_per_backend_counters;
|
||||
|
||||
/* Pointer to the shared memory array of neon_per_backend_counters structs */
|
||||
@@ -169,6 +174,12 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
|
||||
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
|
||||
|
||||
/*
|
||||
* Backend-local minimal in-flight request LSN.
|
||||
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
|
||||
*/
|
||||
#define MIN_BACKEND_REQUEST_LSN MyNeonCounters->min_request_lsn
|
||||
|
||||
extern void inc_getpage_wait(uint64 latency);
|
||||
extern void inc_page_cache_read_wait(uint64 latency);
|
||||
extern void inc_page_cache_write_wait(uint64 latency);
|
||||
|
||||
@@ -243,6 +243,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern XLogRecPtr last_replay_lsn;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
|
||||
@@ -96,6 +96,8 @@ typedef enum
|
||||
|
||||
int debug_compare_local;
|
||||
|
||||
XLogRecPtr last_replay_lsn;
|
||||
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
@@ -159,7 +161,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
page_std);
|
||||
}
|
||||
|
||||
return ProcLastRecPtr;
|
||||
return GetXLogInsertRecPtr();
|
||||
}
|
||||
#endif /* PG_MAJORVERSION_NUM >= 17 */
|
||||
|
||||
@@ -588,6 +590,17 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
/* Request the page at the end of the last fully replayed LSN. */
|
||||
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
if (MIN_BACKEND_REQUEST_LSN == InvalidXLogRecPtr)
|
||||
{
|
||||
/* mark the backend's replay_lsn as "we have a request ongoing", blocking the expiration of any current LSN */
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
/* make sure memory operations are in correct order, even in concurrent systems */
|
||||
pg_memory_barrier();
|
||||
/* get the current LSN to register */
|
||||
replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
}
|
||||
last_replay_lsn = replay_lsn;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
neon_request_lsns *result = &output[i];
|
||||
|
||||
@@ -1312,8 +1312,8 @@ class NeonEnv:
|
||||
)
|
||||
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
# Enable relsize_v2 by default in tests.
|
||||
tenant_config["rel_size_v2_enabled"] = True
|
||||
# This feature is pending rollout.
|
||||
# tenant_config["rel_size_v2_enabled"] = True
|
||||
|
||||
# Test authors tend to forget about the default 10min initial lease deadline
|
||||
# when writing tests, which turns their immediate gc requests via mgmt API
|
||||
|
||||
@@ -80,12 +80,7 @@ def test_perf_simple_many_relations_reldir(
|
||||
"""
|
||||
Test creating many relations in a single database.
|
||||
"""
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"rel_size_v2_enabled": reldir != "v1",
|
||||
"rel_size_v1_access_disabled": reldir == "v2",
|
||||
}
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": reldir != "v1"})
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -113,6 +108,10 @@ def test_perf_simple_many_relations_reldir(
|
||||
== "migrating"
|
||||
)
|
||||
elif reldir == "v2":
|
||||
# only read/write to the v2 keyspace
|
||||
env.pageserver.http_client().timeline_patch_index_part(
|
||||
env.initial_tenant, env.initial_timeline, {"rel_size_migration": "migrated"}
|
||||
)
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
|
||||
@@ -183,7 +183,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"lsn_lease_length": "1m",
|
||||
"lsn_lease_length_for_ts": "5s",
|
||||
"timeline_offloading": False,
|
||||
"rel_size_v2_enabled": False,
|
||||
"rel_size_v2_enabled": True,
|
||||
"relsize_snapshot_cache_capacity": 10000,
|
||||
"gc_compaction_enabled": False,
|
||||
"gc_compaction_verification": False,
|
||||
@@ -194,7 +194,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"numerator": 0,
|
||||
"denominator": 10,
|
||||
},
|
||||
"rel_size_v1_access_disabled": True,
|
||||
}
|
||||
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
|
||||
@@ -581,10 +581,12 @@ def test_historic_storage_formats(
|
||||
# This dataset was created at a time where we decided to migrate to v2 reldir by simply disabling writes to v1
|
||||
# and starting writing to v2. This was too risky and we have reworked the migration plan. Therefore, we should
|
||||
# opt in full relv2 mode for this dataset.
|
||||
env.pageserver.http_client().patch_tenant_config(
|
||||
dataset.tenant_id,
|
||||
{"rel_size_v1_access_disabled": True, "rel_size_v2_enabled": True},
|
||||
)
|
||||
for timeline in timelines:
|
||||
env.pageserver.http_client().timeline_patch_index_part(
|
||||
dataset.tenant_id,
|
||||
timeline["timeline_id"],
|
||||
{"force_index_update": True, "rel_size_migration": "migrated"},
|
||||
)
|
||||
|
||||
# Import tenant does not create the timeline on safekeepers,
|
||||
# because it is a debug handler and the timeline may have already been
|
||||
|
||||
@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.6",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.6"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
|
||||
@@ -123,10 +123,9 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
|
||||
def patch_tenant_conf(tenant_conf: dict[str, Any], reldir_type: str) -> dict[str, Any]:
|
||||
tenant_conf = tenant_conf.copy()
|
||||
if reldir_type == "v2":
|
||||
tenant_conf["rel_size_v2_enabled"] = True
|
||||
tenant_conf["rel_size_v1_access_disabled"] = True
|
||||
tenant_conf["rel_size_v2_enabled"] = "true"
|
||||
else:
|
||||
tenant_conf["rel_size_v2_enabled"] = False
|
||||
tenant_conf["rel_size_v2_enabled"] = "false"
|
||||
return tenant_conf
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user