Compare commits

...

7 Commits

Author SHA1 Message Date
Alex Chi Z
37e25ab51e fix tests
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-24 13:58:24 -04:00
Alex Chi Z
f1deefc077 fix tests
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-23 12:10:33 -04:00
Alex Chi Z
d4fc271766 fix unit test
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 16:48:56 -04:00
Alex Chi Z
21db1bc2f0 metadata v3
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 15:52:16 -04:00
Alex Chi Z
21addb827b fix unit test
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 15:42:09 -04:00
Alex Chi Z
4b23692615 adapt to metadataupdate interface
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 14:19:25 -04:00
Alex Chi Z
667376a9c4 feat(pageserver): add aux_file_v2 feature flag
Signed-off-by: Alex Chi Z <chi@neon.tech>

add tests

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 14:14:02 -04:00
13 changed files with 331 additions and 31 deletions

View File

@@ -434,6 +434,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
try_enable_aux_file_v2: settings
.remove("try_enable_aux_file_v2")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'try_enable_aux_file_v2' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -552,6 +557,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
try_enable_aux_file_v2: settings
.remove("try_enable_aux_file_v2")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'try_enable_aux_file_v2' as bool")?,
}
};

View File

@@ -303,6 +303,7 @@ pub struct TenantConfig {
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub try_enable_aux_file_v2: Option<bool>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -578,6 +579,9 @@ pub struct TimelineInfo {
pub state: TimelineState,
pub walreceiver_status: String,
/// Whether aux file v2 is enabled
pub aux_file_v2: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -74,6 +74,8 @@ struct MetadataCmd {
prev_record_lsn: Option<Lsn>,
/// Replace latest gc cuttoff
latest_gc_cuttoff: Option<Lsn>,
/// Enable aux file v2 storage
aux_file_v2: Option<bool>,
}
#[derive(Parser)]
@@ -213,12 +215,14 @@ fn handle_metadata(
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cuttoff,
aux_file_v2,
}: &MetadataCmd,
) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
// TODO: simplify this part
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
@@ -228,6 +232,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.aux_file_v2(),
);
update_meta = true;
}
@@ -240,6 +245,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.aux_file_v2(),
);
update_meta = true;
}
@@ -252,6 +258,20 @@ fn handle_metadata(
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
meta.aux_file_v2(),
);
update_meta = true;
}
if let Some(aux_file_v2) = aux_file_v2 {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
*aux_file_v2,
);
update_meta = true;
}

View File

@@ -426,6 +426,10 @@ async fn build_timeline_info_common(
state,
walreceiver_status,
aux_file_v2: timeline
.aux_file_v2
.load(std::sync::atomic::Ordering::SeqCst),
};
Ok(info)
}

View File

@@ -32,7 +32,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -1399,6 +1399,31 @@ impl<'a> DatadirModification<'a> {
Some(Bytes::copy_from_slice(content))
};
// TODO: either ensure we don't flip the flag for users with existing AUX files, or do a check there.
let aux_file_v2 = {
let tline_aux_file_v2 = self
.tline
.aux_file_v2
.load(std::sync::atomic::Ordering::SeqCst);
if tline_aux_file_v2 {
true
} else if self.tline.get_try_enable_aux_file_v2() {
info!(
"enabling aux file v2 support for timeline {}",
self.tline.timeline_id
);
// The next index part upload will have `aux_file_v2` to `true`.
self.tline
.aux_file_v2
.store(true, std::sync::atomic::Ordering::SeqCst);
true
} else {
false
}
};
let _ = aux_file_v2; // keep this unused until the write path is implemented
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {

View File

@@ -1346,6 +1346,7 @@ impl Tenant {
initdb_lsn,
initdb_lsn,
pg_version,
false,
);
self.prepare_new_timeline(
new_timeline_id,
@@ -3007,6 +3008,7 @@ impl Tenant {
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
src_timeline.aux_file_v2.load(Ordering::SeqCst),
);
let uninitialized_timeline = self
@@ -3210,6 +3212,7 @@ impl Tenant {
pgdata_lsn,
pgdata_lsn,
pg_version,
false,
);
let raw_timeline = self
.prepare_new_timeline(
@@ -3661,6 +3664,7 @@ pub(crate) mod harness {
image_layer_creation_check_threshold: Some(
tenant_conf.image_layer_creation_check_threshold,
),
try_enable_aux_file_v2: Some(tenant_conf.try_enable_aux_file_v2),
}
}
}

View File

@@ -369,6 +369,10 @@ pub struct TenantConf {
// How much WAL must be ingested before checking again whether a new image layer is required.
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
/// Try enable the aux file v2 storage. Once this is set to true and the tenant writes an AUX file, the
/// pageserver will always use v2 for AUX files and setting this flag to false will be a no-op.
pub try_enable_aux_file_v2: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -464,6 +468,10 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_creation_check_threshold: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub try_enable_aux_file_v2: Option<bool>,
}
impl TenantConfOpt {
@@ -521,6 +529,9 @@ impl TenantConfOpt {
image_layer_creation_check_threshold: self
.image_layer_creation_check_threshold
.unwrap_or(global_conf.image_layer_creation_check_threshold),
try_enable_aux_file_v2: self
.try_enable_aux_file_v2
.unwrap_or(global_conf.try_enable_aux_file_v2),
}
}
}
@@ -562,6 +573,7 @@ impl Default for TenantConf {
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
try_enable_aux_file_v2: false,
}
}
}
@@ -636,6 +648,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
try_enable_aux_file_v2: value.try_enable_aux_file_v2,
}
}
}

View File

@@ -14,10 +14,11 @@ use utils::bin_ser::SerializeError;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
const METADATA_FORMAT_VERSION: u16 = 5;
/// Previous supported format versions.
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
const METADATA_OLD_FORMAT_VERSION_V2: u16 = 4;
const METADATA_OLD_FORMAT_VERSION_V1: u16 = 3;
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
///
@@ -31,7 +32,7 @@ const METADATA_MAX_SIZE: usize = 512;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineMetadata {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV2,
body: TimelineMetadataBodyV3,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -42,6 +43,28 @@ struct TimelineMetadataHeader {
}
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV3 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
aux_file_v2: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn,
@@ -84,6 +107,7 @@ struct TimelineMetadataBodyV1 {
}
impl TimelineMetadata {
#[allow(clippy::too_many_arguments)]
pub fn new(
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
@@ -92,6 +116,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
aux_file_v2: bool,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
@@ -99,7 +124,7 @@ impl TimelineMetadata {
size: 0,
format_version: METADATA_FORMAT_VERSION,
},
body: TimelineMetadataBodyV2 {
body: TimelineMetadataBodyV3 {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
@@ -107,6 +132,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn,
initdb_lsn,
pg_version,
aux_file_v2,
},
}
}
@@ -115,29 +141,51 @@ impl TimelineMetadata {
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
// backward compatible only up to this version
ensure!(
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
"unsupported metadata format version {}",
hdr.format_version
);
let body = match hdr.format_version {
METADATA_OLD_FORMAT_VERSION_V2 => {
let metadata_size = hdr.size as usize;
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV2 =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: body.pg_version,
aux_file_v2: false,
};
let body = TimelineMetadataBodyV2 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
hdr.format_version = METADATA_FORMAT_VERSION;
body
}
METADATA_OLD_FORMAT_VERSION_V1 => {
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
aux_file_v2: false,
};
hdr.format_version = METADATA_FORMAT_VERSION;
body
}
_ => {
anyhow::bail!("unsupported metadata format version {}", hdr.format_version);
}
};
hdr.format_version = METADATA_FORMAT_VERSION;
Ok(Self { hdr, body })
}
@@ -165,7 +213,7 @@ impl TimelineMetadata {
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
} else {
let body =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
@@ -219,6 +267,10 @@ impl TimelineMetadata {
self.body.pg_version
}
pub fn aux_file_v2(&self) -> bool {
self.body.aux_file_v2
}
// Checksums make it awkward to build a valid instance by hand. This helper
// provides a TimelineMetadata with a valid checksum in its header.
#[cfg(test)]
@@ -231,6 +283,7 @@ impl TimelineMetadata {
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
0,
false,
);
let bytes = instance.to_bytes().unwrap();
Self::from_bytes(&bytes).unwrap()
@@ -240,6 +293,7 @@ impl TimelineMetadata {
self.body.disk_consistent_lsn = update.disk_consistent_lsn;
self.body.prev_record_lsn = update.prev_record_lsn;
self.body.latest_gc_cutoff_lsn = update.latest_gc_cutoff_lsn;
self.body.aux_file_v2 = update.aux_file_v2;
}
}
@@ -270,6 +324,7 @@ pub(crate) struct MetadataUpdate {
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
aux_file_v2: bool,
}
impl MetadataUpdate {
@@ -277,11 +332,13 @@ impl MetadataUpdate {
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
aux_file_v2: bool,
) -> Self {
Self {
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cutoff_lsn,
aux_file_v2,
}
}
}
@@ -302,6 +359,7 @@ mod tests {
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
true,
);
let metadata_bytes = original_metadata
@@ -331,7 +389,7 @@ mod tests {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V1,
},
body: TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn(0x200),
@@ -349,7 +407,7 @@ mod tests {
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V1,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
@@ -376,12 +434,83 @@ mod tests {
Lsn(0),
Lsn(0),
14, // All timelines created before this version had pg_version 14
false,
);
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
METADATA_OLD_FORMAT_VERSION_V1, METADATA_FORMAT_VERSION
);
}
// Generate old version metadata and read it with current code.
// Ensure that it is upgraded correctly
#[test]
fn test_metadata_upgrade_v2() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct TimelineMetadataV2 {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV2,
}
let metadata_v2 = TimelineMetadataV2 {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
},
body: TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
pg_version: 16,
},
};
impl TimelineMetadataV2 {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
}
}
let metadata_bytes = metadata_v2
.to_bytes()
.expect("Should serialize correct metadata to bytes");
// This should deserialize to the latest version format
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
let expected_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
16,
false,
);
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION_V2, METADATA_FORMAT_VERSION
);
}
@@ -396,6 +525,7 @@ mod tests {
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
true,
);
let metadata_bytes = original_metadata
.to_bytes()
@@ -449,12 +579,13 @@ mod tests {
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
true,
);
let expected_bytes = vec![
/* bincode length encoding bytes */
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
/* TimelineMetadataHeader */
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
97, 148, 11, 30, 0, 71, 0, 5, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
@@ -464,6 +595,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
1, // aux_file_v2 (1 byte)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
@@ -480,7 +612,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0,
];
let metadata_ser_bytes = original_metadata.ser().unwrap();
assert_eq!(metadata_ser_bytes, expected_bytes);

View File

@@ -1852,6 +1852,7 @@ mod tests {
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
false,
);
// go through serialize + deserialize to fix the header, including checksum

View File

@@ -40,7 +40,6 @@ use utils::{
vec_map::VecMap,
};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -54,6 +53,10 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use std::{
ops::{Deref, Range},
sync::atomic::AtomicBool,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
@@ -382,6 +385,9 @@ pub struct Timeline {
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
/// Indicate whether aux file v2 storage is enabled.
pub(crate) aux_file_v2: AtomicBool,
}
pub struct WalReceiverInfo {
@@ -1737,6 +1743,14 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub(crate) fn get_try_enable_aux_file_v2(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.try_enable_aux_file_v2
.unwrap_or(self.conf.default_tenant_conf.try_enable_aux_file_v2)
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -1987,6 +2001,8 @@ impl Timeline {
dir: None,
n_deltas: 0,
}),
aux_file_v2: AtomicBool::new(false),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -2136,6 +2152,11 @@ impl Timeline {
let shard = self.get_shard_index();
let this = self.myself.upgrade().expect("&self method holds the arc");
if let Some(ref index_part) = index_part {
self.aux_file_v2
.store(index_part.metadata.aux_file_v2(), AtomicOrdering::SeqCst);
}
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
move || {
let _g = span.entered();
@@ -3606,6 +3627,7 @@ impl Timeline {
disk_consistent_lsn,
ondisk_prev_record_lsn,
*self.latest_gc_cutoff_lsn.read(),
self.aux_file_v2.load(AtomicOrdering::SeqCst),
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(

View File

@@ -190,6 +190,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
"image_layer_creation_check_threshold": 1,
"try_enable_aux_file_v2": True,
}
ps_http = env.pageserver.http_client()

View File

@@ -0,0 +1,63 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
logical_replication_sync,
)
def test_aux_v2_config_switch(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_aux_v2_config_switch", "empty")
endpoint = env.endpoints.create_start(
"test_aux_v2_config_switch", config_lines=["log_statement=all"]
)
with env.pageserver.http_client() as client:
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["try_enable_aux_file_v2"] = True
client.set_tenant_config(tenant_id, tenant_config)
# aux file v2 is enabled on the write path
assert not client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
"aux_file_v2"
]
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("create table t(pk integer primary key, payload integer)")
cur.execute(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
)
cur.execute("create publication pub1 for table t, replication_example")
# now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils)
# instead of going through the full logical replication process.
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
vanilla_pg.safe_psql(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
)
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
endpoint.stop()
env.pageserver.assert_log_contains("enabling aux file v2 support")
with env.pageserver.http_client() as client:
# aux file v2 flag should be enabled at this point
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]
with env.pageserver.http_client() as client:
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["try_enable_aux_file_v2"] = False
client.set_tenant_config(tenant_id, tenant_config)
# the flag should still be enabled
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]
env.pageserver.restart()
with env.pageserver.http_client() as client:
# aux file v2 flag should be persisted
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]

View File

@@ -192,6 +192,7 @@ def test_backward_compatibility(
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@pytest.xfail
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")