Compare commits

...

4 Commits

Author SHA1 Message Date
Alex Chi Z
2682a32fec Merge branch 'main' into skyzh/aux-file-flag-v2-again 2024-05-09 15:53:47 -04:00
Alex Chi Z
396b9cf42b use old metadata format by default (for a week) 2024-05-09 13:41:12 -04:00
Alex Chi Z
21f3984fcc add regression test
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-09 10:07:57 -04:00
Alex Chi Z
5418e80544 feat(pageserver): use json for timeline metadata encoding
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-08 14:08:13 -04:00
3 changed files with 270 additions and 102 deletions

View File

@@ -157,7 +157,9 @@ pub fn generate_wal_segment(
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_wal_segment(segno, system_id, lsn),
Err(SerializeError::BadInput)
Err(SerializeError::BadInput(anyhow::anyhow!(
"failed to generate wal segment"
)))
)
}

View File

@@ -48,8 +48,8 @@ impl From<bincode::Error> for DeserializeError {
#[derive(Debug, Error)]
pub enum SerializeError {
/// The serializer isn't able to serialize the supplied data.
#[error("serialize error")]
BadInput,
#[error("serialize error: {0}")]
BadInput(anyhow::Error),
/// While serializing into a `Write` sink, an `io::Error` occurred.
#[error("serialize error: {0}")]
Io(io::Error),
@@ -59,7 +59,7 @@ impl From<bincode::Error> for SerializeError {
fn from(e: bincode::Error) -> Self {
match *e {
bincode::ErrorKind::Io(io_err) => SerializeError::Io(io_err),
_ => SerializeError::BadInput,
err => SerializeError::BadInput(err.into()),
}
}
}

View File

@@ -14,10 +14,19 @@ use utils::bin_ser::SerializeError;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
///
/// Currently, we are in the transition period between bodyv3 and bodyv2. We will keep
/// both formats available for a week, and then auto-upgrade everything to v3.
///
/// The version is distinguished by the header format number. We always use bodyv3 as
/// the in-memory representation of the data. If the header format is the old one, the
/// body will be serialized using bincode. Otherwise, it will be serialized using JSON,
/// which is the default method for bodyv3.
const METADATA_FORMAT_VERSION: u16 = 5;
/// Previous supported format versions.
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
const METADATA_OLD_FORMAT_VERSION_V2: u16 = 4;
const METADATA_OLD_FORMAT_VERSION_V1: u16 = 3;
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
///
@@ -31,7 +40,7 @@ const METADATA_MAX_SIZE: usize = 512;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineMetadata {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV2,
body: TimelineMetadataBodyV3,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -42,6 +51,27 @@ struct TimelineMetadataHeader {
}
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV3 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn,
@@ -97,9 +127,9 @@ impl TimelineMetadata {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V2, // Default to the old format v2
},
body: TimelineMetadataBodyV2 {
body: TimelineMetadataBodyV3 {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
@@ -115,44 +145,56 @@ impl TimelineMetadata {
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
// backward compatible only up to this version
ensure!(
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
"unsupported metadata format version {}",
hdr.format_version
);
let body = match hdr.format_version {
METADATA_OLD_FORMAT_VERSION_V2 => {
let metadata_size = hdr.size as usize;
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV2 =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: body.pg_version,
};
let body = TimelineMetadataBodyV2 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
hdr.format_version = METADATA_OLD_FORMAT_VERSION_V2; // DO NOT auto-upgrade
body
}
METADATA_OLD_FORMAT_VERSION_V1 => {
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
};
hdr.format_version = METADATA_FORMAT_VERSION;
body
}
_ => {
anyhow::bail!("unsupported metadata format version {}", hdr.format_version);
}
};
hdr.format_version = METADATA_FORMAT_VERSION;
Ok(Self { hdr, body })
}
pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
metadata_bytes.len() == METADATA_MAX_SIZE,
"metadata bytes size is wrong"
);
let hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
let metadata_size = hdr.size as usize;
ensure!(
metadata_size <= METADATA_MAX_SIZE,
"corrupted metadata file"
);
let calculated_checksum = crc32c::crc32c(&metadata_bytes[METADATA_HDR_SIZE..metadata_size]);
ensure!(
hdr.checksum == calculated_checksum,
@@ -164,8 +206,8 @@ impl TimelineMetadata {
// upgrade it and return the result
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
} else {
let body =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body: TimelineMetadataBodyV3 =
serde_json::from_slice(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
@@ -175,18 +217,38 @@ impl TimelineMetadata {
}
pub fn to_bytes(&self) -> Result<Vec<u8>, SerializeError> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
match self.hdr.format_version {
METADATA_OLD_FORMAT_VERSION_V2 => {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
}
METADATA_FORMAT_VERSION => {
let body_bytes = serde_json::to_vec(&self.body)
.map_err(|e| SerializeError::BadInput(e.into()))?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = Vec::new();
metadata_bytes.extend(hdr_bytes);
metadata_bytes.extend(body_bytes);
Ok(metadata_bytes)
}
_ => unreachable!(),
}
}
/// [`Lsn`] that corresponds to the corresponding timeline directory
@@ -306,6 +368,8 @@ impl MetadataUpdate {
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
use crate::tenant::harness::TIMELINE_ID;
@@ -349,7 +413,7 @@ mod tests {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V1,
},
body: TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn(0x200),
@@ -367,7 +431,7 @@ mod tests {
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V1,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
@@ -399,66 +463,123 @@ mod tests {
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
METADATA_OLD_FORMAT_VERSION_V1, METADATA_FORMAT_VERSION
);
}
// Generate old version metadata and read it with current code.
// Ensure that it is upgraded correctly
#[test]
fn test_metadata_bincode_serde() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata
#[ignore]
fn test_metadata_upgrade_v2() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct TimelineMetadataV2 {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV2,
}
let metadata_v2 = TimelineMetadataV2 {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
},
body: TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
pg_version: 16,
},
};
impl TimelineMetadataV2 {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
}
}
let metadata_bytes = metadata_v2
.to_bytes()
.expect("Cannot create bytes array from metadata");
.expect("Should serialize correct metadata to bytes");
let metadata_bincode_be_bytes = original_metadata
.ser()
.expect("Cannot serialize the metadata");
// This should deserialize to the latest version format
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
// 8 bytes for the length of the vector
assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len());
let expected_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
16,
);
let expected_bincode_bytes = {
let mut temp = vec![];
let len_bytes = metadata_bytes.len().to_be_bytes();
temp.extend_from_slice(&len_bytes);
temp.extend_from_slice(&metadata_bytes);
temp
};
assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes);
let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap();
// Deserialized metadata has the metadata header, which is different from the serialized one.
// Reference: TimelineMetaData::to_bytes()
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
assert_eq!(deserialized_metadata, expected_metadata);
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION_V2, METADATA_FORMAT_VERSION
);
}
#[test]
fn test_metadata_bincode_serde_ensure_roundtrip() {
let original_metadata = TimelineMetadata::new(
fn test_roundtrip_metadata_v2() {
let metadata_v2 = TimelineMetadata {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
},
body: TimelineMetadataBodyV3 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
pg_version: 16,
},
};
let metadata_bytes = metadata_v2
.to_bytes()
.expect("Should serialize correct metadata to bytes");
// This should deserialize to the latest version format
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
let expected_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
16,
);
assert_eq!(deserialized_metadata.body, expected_metadata.body);
}
#[test]
fn test_encode_regression_v2() {
let mut original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
@@ -468,6 +589,8 @@ mod tests {
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
original_metadata.hdr.format_version = METADATA_OLD_FORMAT_VERSION_V2;
let expected_bytes = vec![
/* bincode length encoding bytes */
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
@@ -512,7 +635,7 @@ mod tests {
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
format_version: METADATA_OLD_FORMAT_VERSION_V2,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
@@ -521,4 +644,47 @@ mod tests {
let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap();
assert_eq!(des_metadata, expected_metadata);
}
#[test]
fn test_encode_regression_v3() {
let metadata_v3 = TimelineMetadata {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_FORMAT_VERSION,
},
body: TimelineMetadataBodyV3 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
pg_version: 16,
},
};
let metadata_bytes = metadata_v3
.to_bytes()
.expect("Should serialize correct metadata to bytes");
assert_eq!(
&metadata_bytes[..METADATA_HDR_SIZE],
&[202, 106, 183, 219, 0, 205, 0, 5]
);
let json_value: serde_json::Value =
serde_json::from_slice(&metadata_bytes[METADATA_HDR_SIZE..]).unwrap();
assert_eq!(
json_value,
json!({
"ancestor_lsn": "0/0",
"ancestor_timeline": "11223344556677881122334455667788",
"disk_consistent_lsn": "0/200",
"initdb_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/0",
"pg_version": 16,
"prev_record_lsn": "0/100"
})
);
}
}