refactor(pageserver): remove aux v1 code path (#9424)

Part of the aux v1 retirement
https://github.com/neondatabase/neon/issues/8623

## Summary of changes

Remove write/read path for aux v1, but keeping the config item and the
index part field for now.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-10-17 12:22:44 -04:00
committed by GitHub
parent 858867c627
commit 63b3491c1b
10 changed files with 60 additions and 897 deletions

View File

@@ -743,8 +743,6 @@ pub struct TimelineInfo {
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
// read.
/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
pub is_archived: Option<bool>,
}

View File

@@ -18,7 +18,6 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
@@ -474,8 +473,6 @@ async fn build_timeline_info_common(
is_archived: Some(is_archived),
walreceiver_status,
last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
@@ -2399,31 +2396,6 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}
async fn force_aux_policy_switch_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
let policy: AuxFilePolicy = json_request(&mut r).await?;
let state = get_state(&r);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline
.do_switch_aux_policy(policy)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
@@ -3136,10 +3108,6 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
)
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",

View File

@@ -22,7 +22,6 @@ use pageserver_api::key::{
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -33,7 +32,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -677,21 +676,6 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
async fn list_aux_files_v1(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
Ok(HashMap::new())
}
}
}
async fn list_aux_files_v2(
&self,
lsn: Lsn,
@@ -722,10 +706,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
self.list_aux_files_v2(lsn, ctx).await?;
}
self.list_aux_files_v2(lsn, ctx).await?;
Ok(())
}
@@ -734,51 +715,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
let empty_str = if res.is_empty() { ", empty" } else { "" };
warn!(
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
);
Ok(res)
}
None => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
if !res.is_empty() {
warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
}
Ok(res)
}
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
Some(AuxFilePolicy::CrossValidation) => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
(Ok(v1), Ok(v2)) => {
if v1 != v2 {
tracing::error!(
"unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
);
return Err(PageReconstructError::Other(anyhow::anyhow!(
"unmatched aux file v1 v2 result"
)));
}
Ok(v1)
}
(Ok(_), Err(v2)) => {
tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
Err(v2)
}
(Err(v1), Ok(_)) => {
tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
Err(v1)
}
(Err(_), Err(v2)) => Err(v2),
}
}
}
self.list_aux_files_v2(lsn, ctx).await
}
pub(crate) async fn get_replorigins(
@@ -954,9 +891,6 @@ impl Timeline {
result.add_key(CONTROLFILE_KEY);
result.add_key(CHECKPOINT_KEY);
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
result.add_key(AUX_FILES_KEY);
}
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
@@ -1166,9 +1100,6 @@ impl<'a> DatadirModification<'a> {
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = if self.tline.pg_version >= 17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
@@ -1347,9 +1278,6 @@ impl<'a> DatadirModification<'a> {
// 'true', now write the updated 'dbdirs' map back.
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory as well
self.init_aux_dir()?;
}
if r.is_none() {
// Create RelDirectory
@@ -1726,200 +1654,60 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, 0));
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
Ok(())
}
pub async fn put_file(
&mut self,
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let switch_policy = self.tline.get_switch_aux_file_policy();
let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
let current_policy = if current_policy.is_none() {
// This path will only be hit once per tenant: we will decide the final policy in this code block.
// The next call to `put_file` will always have `last_aux_file_policy != None`.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
if aux_files_key_v1.is_empty() {
None
} else {
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
current_policy
};
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
}
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
other_files.push(file);
}
other_files.push(file);
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {
// We already updated aux files in `self`: emit a delta and update our latest value.
dir.upsert(file_path.clone(), content.clone());
n_files = dir.files.len();
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
aux_files.n_deltas = 0;
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
aux_files.n_deltas += 1;
}
aux_files.dir = Some(dir);
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
n_files = dir.files.len();
aux_files.dir = Some(dir);
}
Err(
e @ (PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
e @ (PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey(_)),
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
if !matches!(e, PageReconstructError::MissingKey(_)) {
let e = utils::error::report_compact_sources(&e);
tracing::warn!("treating error as if it was a missing key: {}", e);
}
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
n_files = 1;
aux_files.dir = Some(dir);
}
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, n_files));
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
Ok(())
}
@@ -2089,12 +1877,6 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
/// Only used during unit tests, force putting a key into the modification.
#[cfg(test)]
pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
@@ -2212,21 +1994,6 @@ struct RelDirectory {
rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct RelSizeEntry {
nblocks: u32,

View File

@@ -20,7 +20,6 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::TimelineState;
@@ -800,7 +799,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -811,10 +809,6 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
last_aux_file_policy,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -829,10 +823,6 @@ impl Tenant {
if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;
timeline
.last_aux_file_policy
.store(index_part.last_aux_file_policy());
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
@@ -1403,15 +1393,12 @@ impl Tenant {
None
};
let last_aux_file_policy = index_part.last_aux_file_policy();
self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
remote_metadata,
ancestor,
last_aux_file_policy,
ctx,
)
.await
@@ -1824,7 +1811,6 @@ impl Tenant {
create_guard,
initdb_lsn,
None,
None,
)
.await
}
@@ -3032,7 +3018,6 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -3061,7 +3046,6 @@ impl Tenant {
resources,
pg_version,
state,
last_aux_file_policy,
self.attach_wal_lag_cooldown.clone(),
self.cancel.child_token(),
);
@@ -3720,7 +3704,6 @@ impl Tenant {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
src_timeline.last_aux_file_policy.load(),
)
.await?;
@@ -3914,7 +3897,6 @@ impl Tenant {
timeline_create_guard,
pgdata_lsn,
None,
None,
)
.await?;
@@ -3986,7 +3968,6 @@ impl Tenant {
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<UninitializedTimeline<'a>> {
let tenant_shard_id = self.tenant_shard_id;
@@ -4002,7 +3983,6 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
last_aux_file_policy,
)
.context("Failed to create timeline data structure")?;
@@ -4600,7 +4580,6 @@ mod tests {
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
@@ -4609,7 +4588,7 @@ mod tests {
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use rand::{thread_rng, Rng};
@@ -4618,7 +4597,6 @@ mod tests {
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
static TEST_KEY: Lazy<Key> =
@@ -6422,16 +6400,9 @@ mod tests {
}
#[tokio::test]
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
.await
.unwrap();
async fn test_aux_file_e2e() {
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
// the default aux file policy to switch is v2 if not set by the admins
assert_eq!(
harness.tenant_conf.switch_aux_file_policy,
AuxFilePolicy::default_tenant_config()
);
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
@@ -6441,9 +6412,6 @@ mod tests {
.await
.unwrap();
// no aux file is written at this point, so the persistent flag should be unset
assert_eq!(tline.last_aux_file_policy.load(), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
@@ -6454,30 +6422,6 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there"
);
// we can read everything from the storage
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
@@ -6495,12 +6439,6 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"keep v2 storage format when new files are written"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
@@ -6512,321 +6450,9 @@ mod tests {
.await
.unwrap();
// child copies the last flag even if that is not on remote storage yet
assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
assert_eq!(files.get("pg_logical/mappings/test2"), None);
// even if we crash here without flushing parent timeline with it's new
// last_aux_file_policy we are safe, because child was never meant to access ancestor's
// files. the ancestor can even switch back to V1 because of a migration safely.
}
#[tokio::test]
async fn aux_file_policy_switch() {
let mut harness = TenantHarness::create("aux_file_policy_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::CrossValidation),
"dirty index_part.json reflected state is yet to be updated"
);
// we can still read the auxfile v1 before we ingest anything new
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first")),
"cross validation writes to both v1 and v2 so this should be available in v2"
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V1),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"third", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V1,
"wanted state has been updated again, even if invalid request"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
// mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test3", b"last", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
assert_eq!(
files.get("pg_logical/mappings/test3"),
Some(&bytes::Bytes::from_static(b"last"))
);
}
#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"dirty index_part.json reflected state is yet to be updated"
);
// lose all data from v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// read data ingested in v2
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// lose all data from v1
assert_eq!(files.get("pg_logical/mappings/test1"), None);
}
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: vec![(
"test_file".to_string(),
Bytes::copy_from_slice(b"test_file"),
)]
.into_iter()
.collect(),
})
.unwrap();
modification.put_for_test(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
modification.commit(&ctx).await.unwrap();
}
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V1),
"keep using v1 because there are aux files writting with v1"
);
// we can still read the auxfile v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("test_file"),
Some(&bytes::Bytes::from_static(b"test_file"))
);
}
#[tokio::test]

View File

@@ -187,7 +187,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
@@ -628,18 +628,6 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
self: &Arc<Self>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.last_aux_file_policy = last_aux_file_policy;
self.schedule_index_upload(upload_queue)?;
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
///
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,

View File

@@ -133,10 +133,6 @@ impl IndexPart {
pub(crate) fn example() -> Self {
Self::empty(TimelineMetadata::example())
}
pub(crate) fn last_aux_file_policy(&self) -> Option<AuxFilePolicy> {
self.last_aux_file_policy
}
}
/// Metadata gathered for each of the layer files.

View File

@@ -28,9 +28,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -98,12 +98,12 @@ use crate::{
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
pgdatadir_mapping::DirectoryKind,
virtual_file::{MaybeFatalIo, VirtualFile},
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
@@ -206,11 +206,6 @@ pub struct TimelineResources {
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
pub(crate) struct AuxFilesState {
pub(crate) dir: Option<AuxFilesDirectory>,
pub(crate) n_deltas: usize,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
@@ -413,15 +408,9 @@ pub struct Timeline {
timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
/// Size estimator for aux file v2
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
/// Indicate whether aux file v2 storage is enabled.
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
/// Some test cases directly place keys into the timeline without actually modifying the directory
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
@@ -2012,14 +2001,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.switch_aux_file_policy
.unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy)
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2152,7 +2133,6 @@ impl Timeline {
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
aux_file_policy: Option<AuxFilePolicy>,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
cancel: CancellationToken,
) -> Arc<Self> {
@@ -2282,15 +2262,8 @@ impl Timeline {
timeline_get_throttle: resources.timeline_get_throttle,
aux_files: tokio::sync::Mutex::new(AuxFilesState {
dir: None,
n_deltas: 0,
}),
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
@@ -2301,10 +2274,6 @@ impl Timeline {
attach_wal_lag_cooldown,
};
if aux_file_policy == Some(AuxFilePolicy::V1) {
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
}
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -4479,14 +4448,6 @@ impl Timeline {
) -> Result<(), detach_ancestor::Error> {
detach_ancestor::complete(self, tenant, attempt, ctx).await
}
/// Switch aux file policy and schedule upload to the index part.
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
self.last_aux_file_policy.store(Some(policy));
self.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
Ok(())
}
}
impl Drop for Timeline {

View File

@@ -283,8 +283,6 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
// Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace
None,
)
.context("create_timeline_struct")?;

View File

@@ -1,8 +1,7 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, BytesMut};
use bytes::BytesMut;
use pageserver_api::key::Key;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -13,7 +12,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
/// Can this request be served by neon redo functions
@@ -236,13 +234,9 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
NeonWalRecord::AuxFile { .. } => {
// No-op: this record will never be created in aux v2.
warn!("AuxFile record should not be created in aux v2");
}
#[cfg(test)]
NeonWalRecord::Test {
@@ -250,6 +244,7 @@ pub(crate) fn apply_in_neon(
clear,
will_init,
} => {
use bytes::BufMut;
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
@@ -261,59 +256,3 @@ pub(crate) fn apply_in_neon(
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}

View File

@@ -1,78 +0,0 @@
from __future__ import annotations
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
NeonEnvBuilder,
logical_replication_sync,
)
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
client = env.pageserver.http_client()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["switch_aux_file_policy"] = AuxFileStore.V2
client.set_tenant_config(tenant_id, tenant_config)
# aux file v2 is enabled on the write path, so for now, it should be unset (or null)
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"]
is None
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("create table t(pk integer primary key, payload integer)")
cur.execute(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
)
cur.execute("create publication pub1 for table t, replication_example")
# now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils)
# instead of going through the full logical replication process.
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
vanilla_pg.safe_psql(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
)
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
endpoint.stop()
with env.pageserver.http_client() as client:
# aux file v2 flag should be enabled at this point
assert (
client.timeline_detail(tenant_id, timeline_id)["last_aux_file_policy"]
== AuxFileStore.V2
)
with env.pageserver.http_client() as client:
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["switch_aux_file_policy"] = "V1"
client.set_tenant_config(tenant_id, tenant_config)
# the flag should still be enabled
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
"last_aux_file_policy"
]
== AuxFileStore.V2
)
env.pageserver.restart()
with env.pageserver.http_client() as client:
# aux file v2 flag should be persisted
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
"last_aux_file_policy"
]
== AuxFileStore.V2
)