mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Merge commit '5c57e8a11' into problame/standby-horizon-leases
This commit is contained in:
@@ -1518,6 +1518,7 @@ pub struct TimelineArchivalConfigRequest {
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct TimelinePatchIndexPartRequest {
|
||||
pub rel_size_migration: Option<RelSizeMigration>,
|
||||
pub rel_size_migrated_at: Option<Lsn>,
|
||||
pub gc_compaction_last_completed_lsn: Option<Lsn>,
|
||||
pub applied_gc_cutoff_lsn: Option<Lsn>,
|
||||
#[serde(default)]
|
||||
@@ -1551,10 +1552,10 @@ pub enum RelSizeMigration {
|
||||
/// `None` is the same as `Some(RelSizeMigration::Legacy)`.
|
||||
Legacy,
|
||||
/// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
|
||||
/// persisted in the index part. The read path will read both formats and merge them.
|
||||
/// persisted in the storage. The read path will read both formats and validate them.
|
||||
Migrating,
|
||||
/// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
|
||||
/// in the index part, and the read path will not read the old format.
|
||||
/// in the storage, and the read path will not read the old format.
|
||||
Migrated,
|
||||
}
|
||||
|
||||
@@ -1637,6 +1638,7 @@ pub struct TimelineInfo {
|
||||
|
||||
/// The status of the rel_size migration.
|
||||
pub rel_size_migration: Option<RelSizeMigration>,
|
||||
pub rel_size_migrated_at: Option<Lsn>,
|
||||
|
||||
/// Whether the timeline is invisible in synthetic size calculations.
|
||||
pub is_invisible: Option<bool>,
|
||||
|
||||
@@ -484,6 +484,8 @@ async fn build_timeline_info_common(
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
);
|
||||
|
||||
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_status();
|
||||
|
||||
let standby_horizon = timeline.standby_horizons.dump();
|
||||
|
||||
let info = TimelineInfo {
|
||||
@@ -517,7 +519,8 @@ async fn build_timeline_info_common(
|
||||
|
||||
state,
|
||||
is_archived: Some(is_archived),
|
||||
rel_size_migration: Some(timeline.get_rel_size_v2_status()),
|
||||
rel_size_migration: Some(rel_size_migration),
|
||||
rel_size_migrated_at,
|
||||
is_invisible: Some(is_invisible),
|
||||
|
||||
walreceiver_status,
|
||||
@@ -934,9 +937,16 @@ async fn timeline_patch_index_part_handler(
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
if request_data.rel_size_migration.is_none() && request_data.rel_size_migrated_at.is_some()
|
||||
{
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"updating rel_size_migrated_at without rel_size_migration is not allowed"
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(rel_size_migration) = request_data.rel_size_migration {
|
||||
timeline
|
||||
.update_rel_size_v2_status(rel_size_migration)
|
||||
.update_rel_size_v2_status(rel_size_migration, request_data.rel_size_migrated_at)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
|
||||
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
|
||||
// Then fishing out pg_control would be unnecessary
|
||||
let mut modification = tline.begin_modification(pgdata_lsn);
|
||||
let mut modification = tline.begin_modification_for_import(pgdata_lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
// Import all but pg_wal
|
||||
@@ -309,7 +309,7 @@ async fn import_wal(
|
||||
waldecoder.feed_bytes(&buf);
|
||||
|
||||
let mut nrecords = 0;
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut modification = tline.begin_modification_for_import(last_lsn);
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
@@ -357,7 +357,7 @@ pub async fn import_basebackup_from_tar(
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
info!("importing base at {base_lsn}");
|
||||
let mut modification = tline.begin_modification(base_lsn);
|
||||
let mut modification = tline.begin_modification_for_import(base_lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
@@ -457,7 +457,7 @@ pub async fn import_wal_from_tar(
|
||||
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut modification = tline.begin_modification_for_import(last_lsn);
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! walingest.rs handles a few things like implicit relation creation and extension.
|
||||
//! Clarify that)
|
||||
//!
|
||||
use std::collections::{HashMap, HashSet, hash_map};
|
||||
use std::collections::{BTreeSet, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -227,6 +227,25 @@ impl Timeline {
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
pending_metadata_bytes: 0,
|
||||
is_importing_pgdata: false,
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
DatadirModification {
|
||||
tline: self,
|
||||
pending_lsns: Vec::new(),
|
||||
pending_metadata_pages: HashMap::new(),
|
||||
pending_data_batch: None,
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
pending_metadata_bytes: 0,
|
||||
is_importing_pgdata: true,
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
@@ -596,6 +615,50 @@ impl Timeline {
|
||||
self.get_rel_exists_in_reldir(tag, version, None, ctx).await
|
||||
}
|
||||
|
||||
async fn get_rel_exists_in_reldir_v1(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
if let Some((cached_key, dir)) = deserialized_reldir_v1 {
|
||||
if cached_key == key {
|
||||
return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
|
||||
} else if cfg!(test) || cfg!(feature = "testing") {
|
||||
panic!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
} else {
|
||||
warn!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
}
|
||||
// Fallback to reading the directory from the datadir.
|
||||
}
|
||||
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
|
||||
}
|
||||
|
||||
async fn get_rel_exists_in_reldir_v2(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
|
||||
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err(
|
||||
|_| {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: decode failed, {}",
|
||||
key
|
||||
))
|
||||
},
|
||||
)?;
|
||||
let exists_v2 = buf == RelDirExists::Exists;
|
||||
Ok(exists_v2)
|
||||
}
|
||||
|
||||
/// Does the relation exist? With a cached deserialized `RelDirectory`.
|
||||
///
|
||||
/// There are some cases where the caller loops across all relations. In that specific case,
|
||||
@@ -627,45 +690,134 @@ impl Timeline {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Read path: first read the new reldir keyspace. Early return if the relation exists.
|
||||
// Otherwise, read the old reldir keyspace.
|
||||
// TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
|
||||
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
|
||||
|
||||
if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
|
||||
self.get_rel_size_v2_status()
|
||||
match v2_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
let v1_exists = self
|
||||
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?;
|
||||
Ok(v1_exists)
|
||||
}
|
||||
RelSizeMigration::Migrating | RelSizeMigration::Migrated
|
||||
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
|
||||
{
|
||||
// fetch directory listing (new)
|
||||
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
|
||||
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
|
||||
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
let exists_v2 = buf == RelDirExists::Exists;
|
||||
// Fast path: if the relation exists in the new format, return true.
|
||||
// TODO: we should have a verification mode that checks both keyspaces
|
||||
// to ensure the relation only exists in one of them.
|
||||
if exists_v2 {
|
||||
return Ok(true);
|
||||
// For requests below the migrated LSN, we still use the v1 read path.
|
||||
let v1_exists = self
|
||||
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?;
|
||||
Ok(v1_exists)
|
||||
}
|
||||
RelSizeMigration::Migrating => {
|
||||
let v1_exists = self
|
||||
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?;
|
||||
let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await;
|
||||
match v2_exists_res {
|
||||
Ok(v2_exists) if v1_exists == v2_exists => {}
|
||||
Ok(v2_exists) => {
|
||||
tracing::warn!(
|
||||
"inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
|
||||
tag,
|
||||
v1_exists,
|
||||
v2_exists
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to get rel exists in v2: {e}");
|
||||
}
|
||||
}
|
||||
Ok(v1_exists)
|
||||
}
|
||||
RelSizeMigration::Migrated => {
|
||||
let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?;
|
||||
Ok(v2_exists)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fetch directory listing (old)
|
||||
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
|
||||
if let Some((cached_key, dir)) = deserialized_reldir_v1 {
|
||||
if cached_key == key {
|
||||
return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
|
||||
} else if cfg!(test) || cfg!(feature = "testing") {
|
||||
panic!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
} else {
|
||||
warn!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
}
|
||||
// Fallback to reading the directory from the datadir.
|
||||
}
|
||||
async fn list_rels_v1(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
|
||||
Ok(exists_v1)
|
||||
let rels_v1: HashSet<RelTag> =
|
||||
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: *relnode,
|
||||
forknum: *forknum,
|
||||
}));
|
||||
Ok(rels_v1)
|
||||
}
|
||||
|
||||
async fn list_rels_v2(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
);
|
||||
let results = self
|
||||
.scan(
|
||||
KeySpace::single(key_range),
|
||||
version.get_lsn(),
|
||||
ctx,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?;
|
||||
let mut rels = HashSet::new();
|
||||
for (key, val) in results {
|
||||
let val = RelDirExists::decode(&val?).map_err(|_| {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: decode failed, {}",
|
||||
key
|
||||
))
|
||||
})?;
|
||||
if key.field6 != 1 {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: field6 != 1, {}",
|
||||
key
|
||||
)));
|
||||
}
|
||||
if key.field2 != spcnode {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: field2 != spcnode, {}",
|
||||
key
|
||||
)));
|
||||
}
|
||||
if key.field3 != dbnode {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"invalid reldir key: field3 != dbnode, {}",
|
||||
key
|
||||
)));
|
||||
}
|
||||
let tag = RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
};
|
||||
if val == RelDirExists::Removed {
|
||||
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
|
||||
continue;
|
||||
}
|
||||
let did_not_contain = rels.insert(tag);
|
||||
debug_assert!(did_not_contain, "duplicate reltag in v2");
|
||||
}
|
||||
Ok(rels)
|
||||
}
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
@@ -683,60 +835,45 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
// fetch directory listing (old)
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let rels_v1: HashSet<RelTag> =
|
||||
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
||||
match v2_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
Ok(rels_v1)
|
||||
}
|
||||
RelSizeMigration::Migrating | RelSizeMigration::Migrated
|
||||
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
|
||||
{
|
||||
// For requests below the migrated LSN, we still use the v1 read path.
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
Ok(rels_v1)
|
||||
}
|
||||
RelSizeMigration::Migrating => {
|
||||
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
|
||||
let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
|
||||
match rels_v2_res {
|
||||
Ok(rels_v2) if rels_v1 == rels_v2 => {}
|
||||
Ok(rels_v2) => {
|
||||
tracing::warn!(
|
||||
"inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: *relnode,
|
||||
forknum: *forknum,
|
||||
}));
|
||||
|
||||
if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
|
||||
return Ok(rels_v1);
|
||||
}
|
||||
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
rels_v1.len(),
|
||||
rels_v2.len()
|
||||
);
|
||||
let results = self
|
||||
.scan(
|
||||
KeySpace::single(key_range),
|
||||
version.get_lsn(),
|
||||
ctx,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?;
|
||||
let mut rels = rels_v1;
|
||||
for (key, val) in results {
|
||||
let val = RelDirExists::decode(&val?)
|
||||
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
assert_eq!(key.field6, 1);
|
||||
assert_eq!(key.field2, spcnode);
|
||||
assert_eq!(key.field3, dbnode);
|
||||
let tag = RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
};
|
||||
if val == RelDirExists::Removed {
|
||||
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
|
||||
continue;
|
||||
}
|
||||
let did_not_contain = rels.insert(tag);
|
||||
debug_assert!(did_not_contain, "duplicate reltag in v2");
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to list rels in v2: {e}");
|
||||
}
|
||||
}
|
||||
Ok(rels_v1)
|
||||
}
|
||||
RelSizeMigration::Migrated => {
|
||||
let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?;
|
||||
Ok(rels_v2)
|
||||
}
|
||||
}
|
||||
Ok(rels)
|
||||
}
|
||||
|
||||
/// Get the whole SLRU segment
|
||||
@@ -1258,10 +1395,10 @@ impl Timeline {
|
||||
let mut dbdir_cnt = 0;
|
||||
let mut rel_cnt = 0;
|
||||
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
dbdir_cnt += 1;
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
rel_cnt += 1;
|
||||
@@ -1566,6 +1703,9 @@ pub struct DatadirModification<'a> {
|
||||
|
||||
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
|
||||
pending_metadata_bytes: usize,
|
||||
|
||||
/// Whether we are importing a pgdata directory.
|
||||
is_importing_pgdata: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -1578,6 +1718,14 @@ pub enum MetricsUpdate {
|
||||
Sub(u64),
|
||||
}
|
||||
|
||||
/// Controls the behavior of the reldir keyspace.
|
||||
pub struct RelDirMode {
|
||||
// Whether we can read the v2 keyspace or not.
|
||||
current_status: RelSizeMigration,
|
||||
// Whether we should initialize the v2 keyspace or not.
|
||||
initialize: bool,
|
||||
}
|
||||
|
||||
impl DatadirModification<'_> {
|
||||
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
|
||||
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
|
||||
@@ -1933,30 +2081,49 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
|
||||
/// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
|
||||
/// we enable it, we also need to persist it in `index_part.json`.
|
||||
pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
|
||||
let status = self.tline.get_rel_size_v2_status();
|
||||
/// we enable it, we also need to persist it in `index_part.json` (initialize is true).
|
||||
///
|
||||
/// As this function is only used on the write path, we do not need to read the migrated_at
|
||||
/// field.
|
||||
pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result<RelDirMode> {
|
||||
// TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
|
||||
|
||||
let (status, _) = self.tline.get_rel_size_v2_status();
|
||||
let config = self.tline.get_rel_size_v2_enabled();
|
||||
match (config, status) {
|
||||
(false, RelSizeMigration::Legacy) => {
|
||||
// tenant config didn't enable it and we didn't write any reldir_v2 key yet
|
||||
Ok(false)
|
||||
Ok(RelDirMode {
|
||||
current_status: RelSizeMigration::Legacy,
|
||||
initialize: false,
|
||||
})
|
||||
}
|
||||
(false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
|
||||
(false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
|
||||
// index_part already persisted that the timeline has enabled rel_size_v2
|
||||
Ok(true)
|
||||
Ok(RelDirMode {
|
||||
current_status: status,
|
||||
initialize: false,
|
||||
})
|
||||
}
|
||||
(true, RelSizeMigration::Legacy) => {
|
||||
// The first time we enable it, we need to persist it in `index_part.json`
|
||||
self.tline
|
||||
.update_rel_size_v2_status(RelSizeMigration::Migrating)?;
|
||||
tracing::info!("enabled rel_size_v2");
|
||||
Ok(true)
|
||||
// The caller should update the reldir status once the initialization is done.
|
||||
//
|
||||
// Only initialize the v2 keyspace on new relation creation. No initialization
|
||||
// during `timeline_create` (TODO: fix this, we should allow, but currently it
|
||||
// hits consistency issues).
|
||||
Ok(RelDirMode {
|
||||
current_status: RelSizeMigration::Legacy,
|
||||
initialize: is_create && !self.is_importing_pgdata,
|
||||
})
|
||||
}
|
||||
(true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
|
||||
(true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
|
||||
// index_part already persisted that the timeline has enabled rel_size_v2
|
||||
// and we don't need to do anything
|
||||
Ok(true)
|
||||
Ok(RelDirMode {
|
||||
current_status: status,
|
||||
initialize: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1969,8 +2136,8 @@ impl DatadirModification<'_> {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
let v2_mode = self
|
||||
.maybe_enable_rel_size_v2(false)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
@@ -1986,17 +2153,19 @@ impl DatadirModification<'_> {
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
// TODO: if we have fully migrated to v2, no need to create this directory
|
||||
if v2_mode.current_status != RelSizeMigration::Legacy {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
}
|
||||
|
||||
// Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory.
|
||||
// Some code path relies on this directory to be present. We should remove it once we starts to set tenants to
|
||||
// `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`).
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
|
||||
if v2_enabled {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
}
|
||||
self.put(
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Value::Image(Bytes::from(buf)),
|
||||
@@ -2103,6 +2272,109 @@ impl DatadirModification<'_> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn initialize_rel_size_v2_keyspace(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
dbdir: &DbDirectory,
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort.
|
||||
tracing::info!("initializing rel_size_v2 keyspace");
|
||||
let mut rel_cnt = 0;
|
||||
// relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways
|
||||
for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
let rel_dir_key = rel_dir_to_key(spcnode, dbnode);
|
||||
let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?;
|
||||
for (relnode, forknum) in rel_dir.rels {
|
||||
let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum);
|
||||
self.put(
|
||||
sparse_rel_dir_key,
|
||||
Value::Image(RelDirExists::Exists.encode()),
|
||||
);
|
||||
tracing::info!(
|
||||
"migrated rel_size_v2: {}",
|
||||
RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode,
|
||||
forknum
|
||||
}
|
||||
);
|
||||
rel_cnt += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"initialized rel_size_v2 keyspace at lsn {}: migrated {} relations",
|
||||
self.lsn,
|
||||
rel_cnt
|
||||
);
|
||||
self.tline
|
||||
.update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
Ok::<_, WalIngestError>(())
|
||||
}
|
||||
|
||||
async fn put_rel_creation_v1(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
dbdir_exists: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Reldir v1 write path
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if !dbdir_exists {
|
||||
// Create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_rel_creation_v2(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
dbdir_exists: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Reldir v2 write path
|
||||
let sparse_rel_dir_key =
|
||||
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists in v2
|
||||
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
|
||||
let val = RelDirExists::decode_option(val)
|
||||
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
|
||||
if val == RelDirExists::Exists {
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
self.put(
|
||||
sparse_rel_dir_key,
|
||||
Value::Image(RelDirExists::Exists.encode()),
|
||||
);
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a relation fork.
|
||||
///
|
||||
/// 'nblocks' is the initial size.
|
||||
@@ -2136,66 +2408,31 @@ impl DatadirModification<'_> {
|
||||
true
|
||||
};
|
||||
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if !dbdir_exists {
|
||||
// Create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
|
||||
};
|
||||
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
let mut v2_mode = self
|
||||
.maybe_enable_rel_size_v2(true)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
if v2_enabled {
|
||||
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
let sparse_rel_dir_key =
|
||||
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists in v2
|
||||
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
|
||||
let val = RelDirExists::decode_option(val)
|
||||
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
|
||||
if val == RelDirExists::Exists {
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
self.put(
|
||||
sparse_rel_dir_key,
|
||||
Value::Image(RelDirExists::Exists.encode()),
|
||||
);
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
// We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
|
||||
// TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
|
||||
// will be key not found errors if we don't create an empty one for rel_size_v2.
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
|
||||
);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
|
||||
if v2_mode.initialize {
|
||||
if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
|
||||
tracing::warn!("error initializing rel_size_v2 keyspace: {}", e);
|
||||
// TODO: circuit breaker so that it won't retry forever
|
||||
} else {
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
v2_mode.current_status = RelSizeMigration::Migrating;
|
||||
}
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
|
||||
);
|
||||
|
||||
if v2_mode.current_status != RelSizeMigration::Migrated {
|
||||
self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
|
||||
}
|
||||
|
||||
if v2_mode.current_status != RelSizeMigration::Legacy {
|
||||
let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await;
|
||||
if let Err(e) = write_v2_res {
|
||||
if v2_mode.current_status == RelSizeMigration::Migrated {
|
||||
return Err(e);
|
||||
}
|
||||
tracing::warn!("error writing rel_size_v2 keyspace: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Put size
|
||||
@@ -2270,15 +2507,12 @@ impl DatadirModification<'_> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drop some relations
|
||||
pub(crate) async fn put_rel_drops(
|
||||
async fn put_rel_drop_v1(
|
||||
&mut self,
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
) -> Result<BTreeSet<RelTag>, WalIngestError> {
|
||||
let mut dropped_rels = BTreeSet::new();
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
let dir_key = rel_dir_to_key(spc_node, db_node);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
@@ -2290,25 +2524,8 @@ impl DatadirModification<'_> {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
|
||||
dirty = true;
|
||||
dropped_rels.insert(rel_tag);
|
||||
true
|
||||
} else if v2_enabled {
|
||||
// The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
|
||||
// Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
|
||||
// logic).
|
||||
let key =
|
||||
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
|
||||
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
|
||||
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
if val == RelDirExists::Exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
|
||||
// put tombstone
|
||||
self.put(key, Value::Image(RelDirExists::Removed.encode()));
|
||||
// no need to set dirty to true
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
@@ -2331,7 +2548,67 @@ impl DatadirModification<'_> {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
}
|
||||
}
|
||||
Ok(dropped_rels)
|
||||
}
|
||||
|
||||
async fn put_rel_drop_v2(
|
||||
&mut self,
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeSet<RelTag>, WalIngestError> {
|
||||
let mut dropped_rels = BTreeSet::new();
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
for rel_tag in rel_tags {
|
||||
let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
|
||||
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
|
||||
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
if val == RelDirExists::Exists {
|
||||
dropped_rels.insert(rel_tag);
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
|
||||
// put tombstone
|
||||
self.put(key, Value::Image(RelDirExists::Removed.encode()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(dropped_rels)
|
||||
}
|
||||
|
||||
/// Drop some relations
|
||||
pub(crate) async fn put_rel_drops(
|
||||
&mut self,
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_mode = self
|
||||
.maybe_enable_rel_size_v2(false)
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
match v2_mode.current_status {
|
||||
RelSizeMigration::Legacy => {
|
||||
self.put_rel_drop_v1(drop_relations, ctx).await?;
|
||||
}
|
||||
RelSizeMigration::Migrating => {
|
||||
let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?;
|
||||
let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await;
|
||||
match dropped_rels_v2_res {
|
||||
Ok(dropped_rels_v2) => {
|
||||
if dropped_rels_v1 != dropped_rels_v2 {
|
||||
tracing::warn!(
|
||||
"inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}",
|
||||
dropped_rels_v1.len(),
|
||||
dropped_rels_v2.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("error dropping rels: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
RelSizeMigration::Migrated => {
|
||||
self.put_rel_drop_v2(drop_relations, ctx).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1231,6 +1231,7 @@ impl TenantShard {
|
||||
idempotency.clone(),
|
||||
index_part.gc_compaction.clone(),
|
||||
index_part.rel_size_migration.clone(),
|
||||
index_part.rel_size_migrated_at,
|
||||
ctx,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
@@ -2610,6 +2611,7 @@ impl TenantShard {
|
||||
initdb_lsn,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2939,6 +2941,7 @@ impl TenantShard {
|
||||
initdb_lsn,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -4392,6 +4395,7 @@ impl TenantShard {
|
||||
create_idempotency: CreateTimelineIdempotency,
|
||||
gc_compaction_state: Option<GcCompactionState>,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(Arc<Timeline>, RequestContext)> {
|
||||
let state = match cause {
|
||||
@@ -4426,6 +4430,7 @@ impl TenantShard {
|
||||
create_idempotency,
|
||||
gc_compaction_state,
|
||||
rel_size_v2_status,
|
||||
rel_size_migrated_at,
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
@@ -5138,6 +5143,7 @@ impl TenantShard {
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let (rel_size_v2_status, rel_size_migrated_at) = src_timeline.get_rel_size_v2_status();
|
||||
let (uninitialized_timeline, _timeline_ctx) = self
|
||||
.prepare_new_timeline(
|
||||
dst_id,
|
||||
@@ -5145,7 +5151,8 @@ impl TenantShard {
|
||||
timeline_create_guard,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
Some(src_timeline.get_rel_size_v2_status()),
|
||||
Some(rel_size_v2_status),
|
||||
rel_size_migrated_at,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -5432,6 +5439,7 @@ impl TenantShard {
|
||||
pgdata_lsn,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -5515,14 +5523,17 @@ impl TenantShard {
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
resources
|
||||
.remote_client
|
||||
.init_upload_queue_for_empty_remote(new_metadata, rel_size_v2_status.clone())?;
|
||||
resources.remote_client.init_upload_queue_for_empty_remote(
|
||||
new_metadata,
|
||||
rel_size_v2_status.clone(),
|
||||
rel_size_migrated_at,
|
||||
)?;
|
||||
|
||||
let (timeline_struct, timeline_ctx) = self
|
||||
.create_timeline_struct(
|
||||
@@ -5535,6 +5546,7 @@ impl TenantShard {
|
||||
create_guard.idempotency.clone(),
|
||||
None,
|
||||
rel_size_v2_status,
|
||||
rel_size_migrated_at,
|
||||
ctx,
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
@@ -443,7 +443,8 @@ impl RemoteTimelineClient {
|
||||
pub fn init_upload_queue_for_empty_remote(
|
||||
&self,
|
||||
local_metadata: &TimelineMetadata,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
rel_size_v2_migration: Option<RelSizeMigration>,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Set the maximum number of inprogress tasks to the remote storage concurrency. There's
|
||||
// certainly no point in starting more upload tasks than this.
|
||||
@@ -455,7 +456,8 @@ impl RemoteTimelineClient {
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
let initialized_queue =
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
initialized_queue.dirty.rel_size_migration = rel_size_v2_status;
|
||||
initialized_queue.dirty.rel_size_migration = rel_size_v2_migration;
|
||||
initialized_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
|
||||
self.update_remote_physical_size_gauge(None);
|
||||
info!("initialized upload queue as empty");
|
||||
Ok(())
|
||||
@@ -994,10 +996,12 @@ impl RemoteTimelineClient {
|
||||
pub(crate) fn schedule_index_upload_for_rel_size_v2_status_update(
|
||||
self: &Arc<Self>,
|
||||
rel_size_v2_status: RelSizeMigration,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
upload_queue.dirty.rel_size_migration = Some(rel_size_v2_status);
|
||||
upload_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
|
||||
// TODO: allow this operation to bypass the validation check because we might upload the index part
|
||||
// with no layers but the flag updated. For now, we just modify the index part in memory and the next
|
||||
// upload will include the flag.
|
||||
|
||||
@@ -114,6 +114,11 @@ pub struct IndexPart {
|
||||
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
|
||||
|
||||
/// The LSN at which we started the rel size migration. Accesses below this LSN should be
|
||||
/// processed with the v1 read path. Usually this LSN should be set together with `rel_size_migration`.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) rel_size_migrated_at: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
@@ -142,10 +147,12 @@ impl IndexPart {
|
||||
/// - 12: +l2_lsn
|
||||
/// - 13: +gc_compaction
|
||||
/// - 14: +marked_invisible_at
|
||||
const LATEST_VERSION: usize = 14;
|
||||
/// - 15: +rel_size_migrated_at
|
||||
const LATEST_VERSION: usize = 15;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] =
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -165,6 +172,7 @@ impl IndexPart {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,6 +483,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -524,6 +533,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -574,6 +584,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -627,6 +638,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
@@ -675,6 +687,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -726,6 +739,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -782,6 +796,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -843,6 +858,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -905,6 +921,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -972,6 +989,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1052,6 +1070,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1133,6 +1152,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1220,6 +1240,7 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: None,
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1308,6 +1329,97 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
rel_size_migrated_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v15_rel_size_migrated_at_is_parsed() {
|
||||
let example = r#"{
|
||||
"version": 15,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123",
|
||||
"reasons": ["DetachAncestor"]
|
||||
},
|
||||
"import_pgdata": {
|
||||
"V1": {
|
||||
"Done": {
|
||||
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
|
||||
"started_at": "2024-11-13T09:23:42.123",
|
||||
"finished_at": "2024-11-13T09:42:23.123"
|
||||
}
|
||||
}
|
||||
},
|
||||
"rel_size_migration": "legacy",
|
||||
"l2_lsn": "0/16960E8",
|
||||
"gc_compaction": {
|
||||
"last_completed_lsn": "0/16960E8"
|
||||
},
|
||||
"marked_invisible_at": "2023-07-31T09:00:00.123",
|
||||
"rel_size_migrated_at": "0/16960E8"
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 15,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
PgMajorVersion::PG14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
archived_at: None,
|
||||
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
|
||||
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
|
||||
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
|
||||
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
|
||||
}))),
|
||||
rel_size_migration: Some(RelSizeMigration::Legacy),
|
||||
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
|
||||
gc_compaction: Some(GcCompactionState {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
rel_size_migrated_at: Some("0/16960E8".parse::<Lsn>().unwrap()),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -442,7 +442,7 @@ pub struct Timeline {
|
||||
/// heatmap on demand.
|
||||
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
|
||||
|
||||
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
|
||||
pub(crate) rel_size_v2_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore,
|
||||
|
||||
@@ -2941,12 +2941,9 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
|
||||
}
|
||||
|
||||
pub(crate) fn get_rel_size_v2_status(&self) -> RelSizeMigration {
|
||||
self.rel_size_v2_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.map(|s| s.as_ref().clone())
|
||||
.unwrap_or(RelSizeMigration::Legacy)
|
||||
pub(crate) fn get_rel_size_v2_status(&self) -> (RelSizeMigration, Option<Lsn>) {
|
||||
let (status, migrated_at) = self.rel_size_v2_status.load().as_ref().clone();
|
||||
(status.unwrap_or(RelSizeMigration::Legacy), migrated_at)
|
||||
}
|
||||
|
||||
fn get_compaction_upper_limit(&self) -> usize {
|
||||
@@ -3221,6 +3218,7 @@ impl Timeline {
|
||||
create_idempotency: crate::tenant::CreateTimelineIdempotency,
|
||||
gc_compaction_state: Option<GcCompactionState>,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
@@ -3383,7 +3381,10 @@ impl Timeline {
|
||||
|
||||
heatmap_layers_downloader: Mutex::new(None),
|
||||
|
||||
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
|
||||
rel_size_v2_status: ArcSwap::from_pointee((
|
||||
rel_size_v2_status,
|
||||
rel_size_migrated_at,
|
||||
)),
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
|
||||
|
||||
@@ -3473,11 +3474,17 @@ impl Timeline {
|
||||
pub(crate) fn update_rel_size_v2_status(
|
||||
&self,
|
||||
rel_size_v2_status: RelSizeMigration,
|
||||
rel_size_migrated_at: Option<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
self.rel_size_v2_status
|
||||
.store(Some(Arc::new(rel_size_v2_status.clone())));
|
||||
self.rel_size_v2_status.store(Arc::new((
|
||||
Some(rel_size_v2_status.clone()),
|
||||
rel_size_migrated_at,
|
||||
)));
|
||||
self.remote_client
|
||||
.schedule_index_upload_for_rel_size_v2_status_update(rel_size_v2_status)
|
||||
.schedule_index_upload_for_rel_size_v2_status_update(
|
||||
rel_size_v2_status,
|
||||
rel_size_migrated_at,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_gc_compaction_state(&self) -> Option<GcCompactionState> {
|
||||
|
||||
@@ -332,6 +332,7 @@ impl DeleteTimelineFlow {
|
||||
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
|
||||
None, // doesn't matter what we put here
|
||||
None, // doesn't matter what we put here
|
||||
None, // doesn't matter what we put here
|
||||
ctx,
|
||||
)
|
||||
.context("create_timeline_struct")?;
|
||||
|
||||
@@ -5,7 +5,7 @@ import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.compare_fixtures import RemoteCompare
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.utils import shared_buffers_for_max_cu
|
||||
|
||||
|
||||
@@ -69,13 +69,20 @@ def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int):
|
||||
)
|
||||
|
||||
|
||||
def test_perf_simple_many_relations_reldir_v2(
|
||||
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
|
||||
@pytest.mark.parametrize(
|
||||
"reldir,num_relations",
|
||||
[("v1", 10000), ("v1v2", 10000), ("v2", 10000), ("v2", 100000)],
|
||||
ids=["v1-small", "v1v2-small", "v2-small", "v2-large"],
|
||||
)
|
||||
def test_perf_simple_many_relations_reldir(
|
||||
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, reldir: str, num_relations: int
|
||||
):
|
||||
"""
|
||||
Test creating many relations in a single database.
|
||||
"""
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": "true"})
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={"rel_size_v2_enabled": "true" if reldir != "v1" else "false"}
|
||||
)
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -85,14 +92,38 @@ def test_perf_simple_many_relations_reldir_v2(
|
||||
],
|
||||
)
|
||||
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS initial_table (v1 int)")
|
||||
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
if reldir == "v1":
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
!= "legacy"
|
||||
== "legacy"
|
||||
)
|
||||
elif reldir == "v1v2":
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
== "migrating"
|
||||
)
|
||||
elif reldir == "v2":
|
||||
# only read/write to the v2 keyspace
|
||||
env.pageserver.http_client().timeline_patch_index_part(
|
||||
env.initial_tenant, env.initial_timeline, {"rel_size_migration": "migrated"}
|
||||
)
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
== "migrated"
|
||||
)
|
||||
else:
|
||||
raise AssertionError(f"Invalid reldir config: {reldir}")
|
||||
|
||||
n = 100000
|
||||
n = num_relations
|
||||
step = 5000
|
||||
# Create many relations
|
||||
log.info(f"Creating {n} relations...")
|
||||
|
||||
@@ -538,6 +538,7 @@ def test_historic_storage_formats(
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.pg_version = dataset.pg_version
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
env.start()
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
|
||||
@@ -576,6 +577,17 @@ def test_historic_storage_formats(
|
||||
# All our artifacts should contain at least one timeline
|
||||
assert len(timelines) > 0
|
||||
|
||||
if dataset.name == "2025-04-08-tenant-manifest-v1":
|
||||
# This dataset was created at a time where we decided to migrate to v2 reldir by simply disabling writes to v1
|
||||
# and starting writing to v2. This was too risky and we have reworked the migration plan. Therefore, we should
|
||||
# opt in full relv2 mode for this dataset.
|
||||
for timeline in timelines:
|
||||
env.pageserver.http_client().timeline_patch_index_part(
|
||||
dataset.tenant_id,
|
||||
timeline["timeline_id"],
|
||||
{"force_index_update": True, "rel_size_migration": "migrated"},
|
||||
)
|
||||
|
||||
# Import tenant does not create the timeline on safekeepers,
|
||||
# because it is a debug handler and the timeline may have already been
|
||||
# created on some set of safekeepers.
|
||||
|
||||
@@ -457,21 +457,6 @@ def test_tx_abort_with_many_relations(
|
||||
],
|
||||
)
|
||||
|
||||
if reldir_type == "v1":
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
== "legacy"
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
!= "legacy"
|
||||
)
|
||||
|
||||
# How many relations: this number is tuned to be long enough to take tens of seconds
|
||||
# if the rollback code path is buggy, tripping the test's timeout.
|
||||
n = 5000
|
||||
@@ -556,3 +541,19 @@ def test_tx_abort_with_many_relations(
|
||||
except:
|
||||
exec.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
|
||||
# Do the check after everything is done, because the reldirv2 transition won't happen until create table.
|
||||
if reldir_type == "v1":
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
== "legacy"
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migration"
|
||||
]
|
||||
!= "legacy"
|
||||
)
|
||||
|
||||
@@ -7,6 +7,8 @@ if TYPE_CHECKING:
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
|
||||
from fixtures.neon_fixtures import wait_for_last_flush_lsn
|
||||
|
||||
|
||||
def test_pageserver_reldir_v2(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
@@ -65,6 +67,8 @@ def test_pageserver_reldir_v2(
|
||||
endpoint.safe_psql("CREATE TABLE foo4 (id INTEGER PRIMARY KEY, val text)")
|
||||
# Delete a relation in v1
|
||||
endpoint.safe_psql("DROP TABLE foo1")
|
||||
# wait pageserver to apply the LSN
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Check if both relations are still accessible
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
@@ -76,12 +80,16 @@ def test_pageserver_reldir_v2(
|
||||
# This will acquire a basebackup, which lists all relations.
|
||||
endpoint.start()
|
||||
|
||||
# Check if both relations are still accessible
|
||||
# Check if both relations are still accessible after restart
|
||||
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
endpoint.safe_psql("SELECT * FROM foo3")
|
||||
endpoint.safe_psql("SELECT * FROM foo4")
|
||||
endpoint.safe_psql("DROP TABLE foo3")
|
||||
# wait pageserver to apply the LSN
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Restart the endpoint again
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
@@ -99,6 +107,9 @@ def test_pageserver_reldir_v2(
|
||||
},
|
||||
)
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# Check if the relation is still accessible
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
endpoint.safe_psql("SELECT * FROM foo4")
|
||||
@@ -111,3 +122,10 @@ def test_pageserver_reldir_v2(
|
||||
]
|
||||
== "migrating"
|
||||
)
|
||||
|
||||
assert (
|
||||
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
|
||||
"rel_size_migrated_at"
|
||||
]
|
||||
is not None
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user