mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-05 07:10:38 +00:00
Compare commits
7 Commits
zerocopy-p
...
skyzh/aux-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37e25ab51e | ||
|
|
f1deefc077 | ||
|
|
d4fc271766 | ||
|
|
21db1bc2f0 | ||
|
|
21addb827b | ||
|
|
4b23692615 | ||
|
|
667376a9c4 |
@@ -434,6 +434,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
try_enable_aux_file_v2: settings
|
||||
.remove("try_enable_aux_file_v2")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'try_enable_aux_file_v2' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -552,6 +557,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
try_enable_aux_file_v2: settings
|
||||
.remove("try_enable_aux_file_v2")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'try_enable_aux_file_v2' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -303,6 +303,7 @@ pub struct TenantConfig {
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub try_enable_aux_file_v2: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -578,6 +579,9 @@ pub struct TimelineInfo {
|
||||
pub state: TimelineState,
|
||||
|
||||
pub walreceiver_status: String,
|
||||
|
||||
/// Whether aux file v2 is enabled
|
||||
pub aux_file_v2: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -74,6 +74,8 @@ struct MetadataCmd {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
/// Replace latest gc cuttoff
|
||||
latest_gc_cuttoff: Option<Lsn>,
|
||||
/// Enable aux file v2 storage
|
||||
aux_file_v2: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -213,12 +215,14 @@ fn handle_metadata(
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
latest_gc_cuttoff,
|
||||
aux_file_v2,
|
||||
}: &MetadataCmd,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
// TODO: simplify this part
|
||||
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
|
||||
meta = TimelineMetadata::new(
|
||||
*disk_consistent_lsn,
|
||||
@@ -228,6 +232,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.aux_file_v2(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -240,6 +245,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.aux_file_v2(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -252,6 +258,20 @@ fn handle_metadata(
|
||||
*latest_gc_cuttoff,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.aux_file_v2(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(aux_file_v2) = aux_file_v2 {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
*aux_file_v2,
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
@@ -426,6 +426,10 @@ async fn build_timeline_info_common(
|
||||
state,
|
||||
|
||||
walreceiver_status,
|
||||
|
||||
aux_file_v2: timeline
|
||||
.aux_file_v2
|
||||
.load(std::sync::atomic::Ordering::SeqCst),
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::vec_map::{VecMap, VecMapOrdering};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
@@ -1399,6 +1399,31 @@ impl<'a> DatadirModification<'a> {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
};
|
||||
|
||||
// TODO: either ensure we don't flip the flag for users with existing AUX files, or do a check there.
|
||||
let aux_file_v2 = {
|
||||
let tline_aux_file_v2 = self
|
||||
.tline
|
||||
.aux_file_v2
|
||||
.load(std::sync::atomic::Ordering::SeqCst);
|
||||
if tline_aux_file_v2 {
|
||||
true
|
||||
} else if self.tline.get_try_enable_aux_file_v2() {
|
||||
info!(
|
||||
"enabling aux file v2 support for timeline {}",
|
||||
self.tline.timeline_id
|
||||
);
|
||||
// The next index part upload will have `aux_file_v2` to `true`.
|
||||
self.tline
|
||||
.aux_file_v2
|
||||
.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let _ = aux_file_v2; // keep this unused until the write path is implemented
|
||||
|
||||
let n_files;
|
||||
let mut aux_files = self.tline.aux_files.lock().await;
|
||||
if let Some(mut dir) = aux_files.dir.take() {
|
||||
|
||||
@@ -1346,6 +1346,7 @@ impl Tenant {
|
||||
initdb_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
false,
|
||||
);
|
||||
self.prepare_new_timeline(
|
||||
new_timeline_id,
|
||||
@@ -3007,6 +3008,7 @@ impl Tenant {
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
src_timeline.aux_file_v2.load(Ordering::SeqCst),
|
||||
);
|
||||
|
||||
let uninitialized_timeline = self
|
||||
@@ -3210,6 +3212,7 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
false,
|
||||
);
|
||||
let raw_timeline = self
|
||||
.prepare_new_timeline(
|
||||
@@ -3661,6 +3664,7 @@ pub(crate) mod harness {
|
||||
image_layer_creation_check_threshold: Some(
|
||||
tenant_conf.image_layer_creation_check_threshold,
|
||||
),
|
||||
try_enable_aux_file_v2: Some(tenant_conf.try_enable_aux_file_v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,6 +369,10 @@ pub struct TenantConf {
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Try enable the aux file v2 storage. Once this is set to true and the tenant writes an AUX file, the
|
||||
/// pageserver will always use v2 for AUX files and setting this flag to false will be a no-op.
|
||||
pub try_enable_aux_file_v2: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -464,6 +468,10 @@ pub struct TenantConfOpt {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub try_enable_aux_file_v2: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -521,6 +529,9 @@ impl TenantConfOpt {
|
||||
image_layer_creation_check_threshold: self
|
||||
.image_layer_creation_check_threshold
|
||||
.unwrap_or(global_conf.image_layer_creation_check_threshold),
|
||||
try_enable_aux_file_v2: self
|
||||
.try_enable_aux_file_v2
|
||||
.unwrap_or(global_conf.try_enable_aux_file_v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,6 +573,7 @@ impl Default for TenantConf {
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
try_enable_aux_file_v2: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -636,6 +648,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
try_enable_aux_file_v2: value.try_enable_aux_file_v2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,10 +14,11 @@ use utils::bin_ser::SerializeError;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
|
||||
|
||||
/// Use special format number to enable backward compatibility.
|
||||
const METADATA_FORMAT_VERSION: u16 = 4;
|
||||
const METADATA_FORMAT_VERSION: u16 = 5;
|
||||
|
||||
/// Previous supported format versions.
|
||||
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
|
||||
const METADATA_OLD_FORMAT_VERSION_V2: u16 = 4;
|
||||
const METADATA_OLD_FORMAT_VERSION_V1: u16 = 3;
|
||||
|
||||
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
|
||||
///
|
||||
@@ -31,7 +32,7 @@ const METADATA_MAX_SIZE: usize = 512;
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct TimelineMetadata {
|
||||
hdr: TimelineMetadataHeader,
|
||||
body: TimelineMetadataBodyV2,
|
||||
body: TimelineMetadataBodyV3,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -42,6 +43,28 @@ struct TimelineMetadataHeader {
|
||||
}
|
||||
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
// This is only set if we know it. We track it in memory when the page
|
||||
// server is running, but we only track the value corresponding to
|
||||
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
|
||||
// lot. We only store it in the metadata file when we flush *all* the
|
||||
// in-memory data so that 'last_record_lsn' is the same as
|
||||
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
|
||||
// soon as we reprocess at least one record, we will have a valid
|
||||
// 'prev_record_lsn' value in memory again. This is only really needed when
|
||||
// doing a clean shutdown, so that there is no more WAL beyond
|
||||
// 'disk_consistent_lsn'
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<TimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
aux_file_v2: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV2 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
@@ -84,6 +107,7 @@ struct TimelineMetadataBodyV1 {
|
||||
}
|
||||
|
||||
impl TimelineMetadata {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
disk_consistent_lsn: Lsn,
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
@@ -92,6 +116,7 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
aux_file_v2: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
hdr: TimelineMetadataHeader {
|
||||
@@ -99,7 +124,7 @@ impl TimelineMetadata {
|
||||
size: 0,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
},
|
||||
body: TimelineMetadataBodyV2 {
|
||||
body: TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
ancestor_timeline,
|
||||
@@ -107,6 +132,7 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
aux_file_v2,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -115,29 +141,51 @@ impl TimelineMetadata {
|
||||
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
|
||||
|
||||
// backward compatible only up to this version
|
||||
ensure!(
|
||||
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
|
||||
"unsupported metadata format version {}",
|
||||
hdr.format_version
|
||||
);
|
||||
let body = match hdr.format_version {
|
||||
METADATA_OLD_FORMAT_VERSION_V2 => {
|
||||
let metadata_size = hdr.size as usize;
|
||||
|
||||
let metadata_size = hdr.size as usize;
|
||||
let body: TimelineMetadataBodyV2 =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
let body = TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: body.pg_version,
|
||||
aux_file_v2: false,
|
||||
};
|
||||
|
||||
let body = TimelineMetadataBodyV2 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
hdr.format_version = METADATA_FORMAT_VERSION;
|
||||
body
|
||||
}
|
||||
METADATA_OLD_FORMAT_VERSION_V1 => {
|
||||
let metadata_size = hdr.size as usize;
|
||||
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
let body = TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
aux_file_v2: false,
|
||||
};
|
||||
|
||||
hdr.format_version = METADATA_FORMAT_VERSION;
|
||||
body
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("unsupported metadata format version {}", hdr.format_version);
|
||||
}
|
||||
};
|
||||
|
||||
hdr.format_version = METADATA_FORMAT_VERSION;
|
||||
|
||||
Ok(Self { hdr, body })
|
||||
}
|
||||
|
||||
@@ -165,7 +213,7 @@ impl TimelineMetadata {
|
||||
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
|
||||
} else {
|
||||
let body =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
ensure!(
|
||||
body.disk_consistent_lsn.is_aligned(),
|
||||
"disk_consistent_lsn is not aligned"
|
||||
@@ -219,6 +267,10 @@ impl TimelineMetadata {
|
||||
self.body.pg_version
|
||||
}
|
||||
|
||||
pub fn aux_file_v2(&self) -> bool {
|
||||
self.body.aux_file_v2
|
||||
}
|
||||
|
||||
// Checksums make it awkward to build a valid instance by hand. This helper
|
||||
// provides a TimelineMetadata with a valid checksum in its header.
|
||||
#[cfg(test)]
|
||||
@@ -231,6 +283,7 @@ impl TimelineMetadata {
|
||||
Lsn::from_hex("00000000").unwrap(),
|
||||
Lsn::from_hex("00000000").unwrap(),
|
||||
0,
|
||||
false,
|
||||
);
|
||||
let bytes = instance.to_bytes().unwrap();
|
||||
Self::from_bytes(&bytes).unwrap()
|
||||
@@ -240,6 +293,7 @@ impl TimelineMetadata {
|
||||
self.body.disk_consistent_lsn = update.disk_consistent_lsn;
|
||||
self.body.prev_record_lsn = update.prev_record_lsn;
|
||||
self.body.latest_gc_cutoff_lsn = update.latest_gc_cutoff_lsn;
|
||||
self.body.aux_file_v2 = update.aux_file_v2;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,6 +324,7 @@ pub(crate) struct MetadataUpdate {
|
||||
disk_consistent_lsn: Lsn,
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
aux_file_v2: bool,
|
||||
}
|
||||
|
||||
impl MetadataUpdate {
|
||||
@@ -277,11 +332,13 @@ impl MetadataUpdate {
|
||||
disk_consistent_lsn: Lsn,
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
aux_file_v2: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
latest_gc_cutoff_lsn,
|
||||
aux_file_v2,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -302,6 +359,7 @@ mod tests {
|
||||
Lsn(0),
|
||||
// Any version will do here, so use the default
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
true,
|
||||
);
|
||||
|
||||
let metadata_bytes = original_metadata
|
||||
@@ -331,7 +389,7 @@ mod tests {
|
||||
hdr: TimelineMetadataHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION_V1,
|
||||
},
|
||||
body: TimelineMetadataBodyV1 {
|
||||
disk_consistent_lsn: Lsn(0x200),
|
||||
@@ -349,7 +407,7 @@ mod tests {
|
||||
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
|
||||
let hdr = TimelineMetadataHeader {
|
||||
size: metadata_size as u16,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION_V1,
|
||||
checksum: crc32c::crc32c(&body_bytes),
|
||||
};
|
||||
let hdr_bytes = hdr.ser()?;
|
||||
@@ -376,12 +434,83 @@ mod tests {
|
||||
Lsn(0),
|
||||
Lsn(0),
|
||||
14, // All timelines created before this version had pg_version 14
|
||||
false,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
deserialized_metadata.body, expected_metadata.body,
|
||||
"Metadata of the old version {} should be upgraded to the latest version {}",
|
||||
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
|
||||
METADATA_OLD_FORMAT_VERSION_V1, METADATA_FORMAT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
// Generate old version metadata and read it with current code.
|
||||
// Ensure that it is upgraded correctly
|
||||
#[test]
|
||||
fn test_metadata_upgrade_v2() {
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct TimelineMetadataV2 {
|
||||
hdr: TimelineMetadataHeader,
|
||||
body: TimelineMetadataBodyV2,
|
||||
}
|
||||
|
||||
let metadata_v2 = TimelineMetadataV2 {
|
||||
hdr: TimelineMetadataHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION_V2,
|
||||
},
|
||||
body: TimelineMetadataBodyV2 {
|
||||
disk_consistent_lsn: Lsn(0x200),
|
||||
prev_record_lsn: Some(Lsn(0x100)),
|
||||
ancestor_timeline: Some(TIMELINE_ID),
|
||||
ancestor_lsn: Lsn(0),
|
||||
latest_gc_cutoff_lsn: Lsn(0),
|
||||
initdb_lsn: Lsn(0),
|
||||
pg_version: 16,
|
||||
},
|
||||
};
|
||||
|
||||
impl TimelineMetadataV2 {
|
||||
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
|
||||
let body_bytes = self.body.ser()?;
|
||||
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
|
||||
let hdr = TimelineMetadataHeader {
|
||||
size: metadata_size as u16,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION_V2,
|
||||
checksum: crc32c::crc32c(&body_bytes),
|
||||
};
|
||||
let hdr_bytes = hdr.ser()?;
|
||||
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
|
||||
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
|
||||
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
|
||||
Ok(metadata_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
let metadata_bytes = metadata_v2
|
||||
.to_bytes()
|
||||
.expect("Should serialize correct metadata to bytes");
|
||||
|
||||
// This should deserialize to the latest version format
|
||||
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
|
||||
.expect("Should deserialize its own bytes");
|
||||
|
||||
let expected_metadata = TimelineMetadata::new(
|
||||
Lsn(0x200),
|
||||
Some(Lsn(0x100)),
|
||||
Some(TIMELINE_ID),
|
||||
Lsn(0),
|
||||
Lsn(0),
|
||||
Lsn(0),
|
||||
16,
|
||||
false,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
deserialized_metadata.body, expected_metadata.body,
|
||||
"Metadata of the old version {} should be upgraded to the latest version {}",
|
||||
METADATA_OLD_FORMAT_VERSION_V2, METADATA_FORMAT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
@@ -396,6 +525,7 @@ mod tests {
|
||||
Lsn(0),
|
||||
// Any version will do here, so use the default
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
true,
|
||||
);
|
||||
let metadata_bytes = original_metadata
|
||||
.to_bytes()
|
||||
@@ -449,12 +579,13 @@ mod tests {
|
||||
Lsn(0),
|
||||
// Any version will do here, so use the default
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
true,
|
||||
);
|
||||
let expected_bytes = vec![
|
||||
/* bincode length encoding bytes */
|
||||
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
|
||||
/* TimelineMetadataHeader */
|
||||
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
|
||||
97, 148, 11, 30, 0, 71, 0, 5, // checksum, size, format_version (4 + 2 + 2)
|
||||
/* TimelineMetadataBodyV2 */
|
||||
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
|
||||
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
|
||||
@@ -464,6 +595,7 @@ mod tests {
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
|
||||
0, 0, 0, 15, // pg_version (4 bytes)
|
||||
1, // aux_file_v2 (1 byte)
|
||||
/* padding bytes */
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
@@ -480,7 +612,7 @@ mod tests {
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0,
|
||||
];
|
||||
let metadata_ser_bytes = original_metadata.ser().unwrap();
|
||||
assert_eq!(metadata_ser_bytes, expected_bytes);
|
||||
|
||||
@@ -1852,6 +1852,7 @@ mod tests {
|
||||
// Any version will do
|
||||
// but it should be consistent with the one in the tests
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
false,
|
||||
);
|
||||
|
||||
// go through serialize + deserialize to fix the header, including checksum
|
||||
|
||||
@@ -40,7 +40,6 @@ use utils::{
|
||||
vec_map::VecMap,
|
||||
};
|
||||
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
@@ -54,6 +53,10 @@ use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
use std::{
|
||||
ops::{Deref, Range},
|
||||
sync::atomic::AtomicBool,
|
||||
};
|
||||
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
@@ -382,6 +385,9 @@ pub struct Timeline {
|
||||
|
||||
/// Keep aux directory cache to avoid it's reconstruction on each update
|
||||
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
|
||||
|
||||
/// Indicate whether aux file v2 storage is enabled.
|
||||
pub(crate) aux_file_v2: AtomicBool,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -1737,6 +1743,14 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub(crate) fn get_try_enable_aux_file_v2(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.try_enable_aux_file_v2
|
||||
.unwrap_or(self.conf.default_tenant_conf.try_enable_aux_file_v2)
|
||||
}
|
||||
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -1987,6 +2001,8 @@ impl Timeline {
|
||||
dir: None,
|
||||
n_deltas: 0,
|
||||
}),
|
||||
|
||||
aux_file_v2: AtomicBool::new(false),
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
@@ -2136,6 +2152,11 @@ impl Timeline {
|
||||
let shard = self.get_shard_index();
|
||||
let this = self.myself.upgrade().expect("&self method holds the arc");
|
||||
|
||||
if let Some(ref index_part) = index_part {
|
||||
self.aux_file_v2
|
||||
.store(index_part.metadata.aux_file_v2(), AtomicOrdering::SeqCst);
|
||||
}
|
||||
|
||||
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
|
||||
move || {
|
||||
let _g = span.entered();
|
||||
@@ -3606,6 +3627,7 @@ impl Timeline {
|
||||
disk_consistent_lsn,
|
||||
ondisk_prev_record_lsn,
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.aux_file_v2.load(AtomicOrdering::SeqCst),
|
||||
);
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
|
||||
@@ -190,6 +190,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"trace_read_requests": True,
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
"image_layer_creation_check_threshold": 1,
|
||||
"try_enable_aux_file_v2": True,
|
||||
}
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
63
test_runner/regress/test_aux_files.py
Normal file
63
test_runner/regress/test_aux_files.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
logical_replication_sync,
|
||||
)
|
||||
|
||||
|
||||
def test_aux_v2_config_switch(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_aux_v2_config_switch", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_aux_v2_config_switch", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_config = client.tenant_config(tenant_id).effective_config
|
||||
tenant_config["try_enable_aux_file_v2"] = True
|
||||
client.set_tenant_config(tenant_id, tenant_config)
|
||||
# aux file v2 is enabled on the write path
|
||||
assert not client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
|
||||
"aux_file_v2"
|
||||
]
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
cur.execute(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
|
||||
)
|
||||
cur.execute("create publication pub1 for table t, replication_example")
|
||||
|
||||
# now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils)
|
||||
# instead of going through the full logical replication process.
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
vanilla_pg.safe_psql(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
|
||||
)
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
# Wait logical replication channel to be established
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
vanilla_pg.stop()
|
||||
endpoint.stop()
|
||||
|
||||
env.pageserver.assert_log_contains("enabling aux file v2 support")
|
||||
with env.pageserver.http_client() as client:
|
||||
# aux file v2 flag should be enabled at this point
|
||||
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_config = client.tenant_config(tenant_id).effective_config
|
||||
tenant_config["try_enable_aux_file_v2"] = False
|
||||
client.set_tenant_config(tenant_id, tenant_config)
|
||||
# the flag should still be enabled
|
||||
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]
|
||||
env.pageserver.restart()
|
||||
with env.pageserver.http_client() as client:
|
||||
# aux file v2 flag should be persisted
|
||||
assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["aux_file_v2"]
|
||||
@@ -192,6 +192,7 @@ def test_backward_compatibility(
|
||||
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
|
||||
|
||||
|
||||
@pytest.xfail
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
|
||||
Reference in New Issue
Block a user