Better API to handle timeline metadata properly

This commit is contained in:
Kirill Bulatov
2021-10-23 01:10:02 +03:00
committed by Kirill Bulatov
parent b532470792
commit e6ef27637b
7 changed files with 240 additions and 118 deletions

View File

@@ -16,13 +16,11 @@ use bookfile::Book;
use bytes::Bytes;
use lazy_static::lazy_static;
use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use tracing::*;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashSet};
use std::convert::TryInto;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
@@ -32,6 +30,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use self::metadata::{metadata_path, TimelineMetadata};
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
@@ -47,7 +46,6 @@ use zenith_metrics::{
register_histogram, register_int_gauge_vec, Histogram, IntGauge, IntGaugeVec,
};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::crashsafe_dir;
use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use zenith_utils::seqwait::SeqWait;
@@ -59,6 +57,7 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
pub mod metadata;
mod page_versions;
mod storage_layer;
@@ -111,8 +110,6 @@ lazy_static! {
.expect("failed to define a metric");
}
/// The name of the metadata file pageserver creates per timeline.
pub const METADATA_FILE_NAME: &str = "metadata";
/// Parts of the `.zenith/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TENANTS_SEGMENT_NAME: &str = "tenants";
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
@@ -145,12 +142,7 @@ impl Repository for LayeredRepository {
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
let metadata = TimelineMetadata {
disk_consistent_lsn: Lsn(0),
prev_record_lsn: None,
ancestor_timeline: None,
ancestor_lsn: Lsn(0),
};
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0));
Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?;
let timeline = LayeredTimeline::new(
@@ -189,12 +181,7 @@ impl Repository for LayeredRepository {
// Create the metadata file, noting the ancestor of the new timeline.
// There is initially no data in it, but all the read-calls know to look
// into the ancestor.
let metadata = TimelineMetadata {
disk_consistent_lsn: start_lsn,
prev_record_lsn: dst_prev,
ancestor_timeline: Some(src),
ancestor_lsn: start_lsn,
};
let metadata = TimelineMetadata::new(start_lsn, dst_prev, Some(src), start_lsn);
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?;
Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?;
@@ -266,14 +253,14 @@ impl LayeredRepository {
Some(timeline) => Ok(timeline.clone()),
None => {
let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?;
let disk_consistent_lsn = metadata.disk_consistent_lsn;
let disk_consistent_lsn = metadata.disk_consistent_lsn();
// Recurse to look up the ancestor timeline.
//
// TODO: If you have a very deep timeline history, this could become
// expensive. Perhaps delay this until we need to look up a page in
// ancestor.
let ancestor = if let Some(ancestor_timelineid) = metadata.ancestor_timeline {
let ancestor = if let Some(ancestor_timelineid) = metadata.ancestor_timeline() {
Some(self.get_timeline_locked(ancestor_timelineid, timelines)?)
} else {
None
@@ -493,66 +480,6 @@ impl LayeredRepository {
}
}
/// Metadata stored on disk for each timeline
///
/// The fields correspond to the values we hold in memory, in LayeredTimeline.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct TimelineMetadata {
/// [`Lsn`] that corresponds to the corresponding timeline directory
/// contents, stored locally in the pageserver workdir.
pub disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
pub prev_record_lsn: Option<Lsn>,
pub ancestor_timeline: Option<ZTimelineId>,
pub ancestor_lsn: Lsn,
}
impl TimelineMetadata {
pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
metadata_bytes.len() == METADATA_MAX_SAFE_SIZE,
"metadata bytes size is wrong"
);
let data = &metadata_bytes[..METADATA_MAX_DATA_SIZE];
let calculated_checksum = crc32c::crc32c(data);
let checksum_bytes: &[u8; METADATA_CHECKSUM_SIZE] =
metadata_bytes[METADATA_MAX_DATA_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
"metadata checksum mismatch"
);
let data = TimelineMetadata::des_prefix(data)?;
assert!(data.disk_consistent_lsn.is_aligned());
Ok(data)
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let mut metadata_bytes = TimelineMetadata::ser(self)?;
assert!(metadata_bytes.len() <= METADATA_MAX_DATA_SIZE);
metadata_bytes.resize(METADATA_MAX_SAFE_SIZE, 0u8);
let checksum = crc32c::crc32c(&metadata_bytes[..METADATA_MAX_DATA_SIZE]);
metadata_bytes[METADATA_MAX_DATA_SIZE..].copy_from_slice(&u32::to_le_bytes(checksum));
Ok(metadata_bytes)
}
}
pub struct LayeredTimeline {
conf: &'static PageServerConf,
@@ -901,13 +828,13 @@ impl LayeredTimeline {
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: metadata.disk_consistent_lsn,
prev: metadata.prev_record_lsn.unwrap_or(Lsn(0)),
last: metadata.disk_consistent_lsn(),
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0),
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn,
ancestor_lsn: metadata.ancestor_lsn(),
current_logical_size: AtomicUsize::new(current_logical_size),
current_logical_size_gauge,
upload_relishes,
@@ -1315,12 +1242,12 @@ impl LayeredTimeline {
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let metadata = TimelineMetadata {
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
prev_record_lsn: ondisk_prev_record_lsn,
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
ondisk_prev_record_lsn,
ancestor_timelineid,
self.ancestor_lsn,
);
LayeredRepository::save_metadata(
self.conf,
@@ -1888,15 +1815,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(())
}
pub fn metadata_path(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(METADATA_FILE_NAME)
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {

View File

@@ -13,7 +13,7 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
use super::metadata::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]

View File

@@ -0,0 +1,202 @@
//! Every image of a certain timeline from [`crate::layered_repository::LayeredRepository`]
//! has a metadata that needs to be stored persistently.
//!
//! Later, the file gets is used in [`crate::relish_storage::storage_sync`] as a part of
//! external storage import and export operations.
//!
//! The module contains all structs and related helper methods related to timeline metadata.
use std::{convert::TryInto, path::PathBuf};
use anyhow::ensure;
use zenith_utils::{
bin_ser::BeSer,
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
};
use crate::{
layered_repository::{METADATA_CHECKSUM_SIZE, METADATA_MAX_DATA_SIZE, METADATA_MAX_SAFE_SIZE},
PageServerConf,
};
/// The name of the metadata file pageserver creates per timeline.
pub const METADATA_FILE_NAME: &str = "metadata";
/// Metadata stored on disk for each timeline
///
/// The fields correspond to the values we hold in memory, in LayeredTimeline.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct TimelineMetadata {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(METADATA_FILE_NAME)
}
impl TimelineMetadata {
pub fn new(
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
) -> Self {
Self {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
ancestor_lsn,
}
}
pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
metadata_bytes.len() == METADATA_MAX_SAFE_SIZE,
"metadata bytes size is wrong"
);
let data = &metadata_bytes[..METADATA_MAX_DATA_SIZE];
let calculated_checksum = crc32c::crc32c(data);
let checksum_bytes: &[u8; METADATA_CHECKSUM_SIZE] =
metadata_bytes[METADATA_MAX_DATA_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
"metadata checksum mismatch"
);
let data = TimelineMetadata::from(serialize::DeTimelineMetadata::des_prefix(data)?);
assert!(data.disk_consistent_lsn.is_aligned());
Ok(data)
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let serializeable_metadata = serialize::SeTimelineMetadata::from(self);
let mut metadata_bytes = serialize::SeTimelineMetadata::ser(&serializeable_metadata)?;
assert!(metadata_bytes.len() <= METADATA_MAX_DATA_SIZE);
metadata_bytes.resize(METADATA_MAX_SAFE_SIZE, 0u8);
let checksum = crc32c::crc32c(&metadata_bytes[..METADATA_MAX_DATA_SIZE]);
metadata_bytes[METADATA_MAX_DATA_SIZE..].copy_from_slice(&u32::to_le_bytes(checksum));
Ok(metadata_bytes)
}
/// [`Lsn`] that corresponds to the corresponding timeline directory
/// contents, stored locally in the pageserver workdir.
pub fn disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn
}
pub fn prev_record_lsn(&self) -> Option<Lsn> {
self.prev_record_lsn
}
pub fn ancestor_timeline(&self) -> Option<ZTimelineId> {
self.ancestor_timeline
}
pub fn ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
}
/// This module is for direct conversion of metadata to bytes and back.
/// For a certain metadata, besides the conversion a few verification steps has to
/// be done, so all serde derives are hidden from the user, to avoid accidental
/// verification-less metadata creation.
mod serialize {
use serde::{Deserialize, Serialize};
use zenith_utils::{lsn::Lsn, zid::ZTimelineId};
use super::TimelineMetadata;
#[derive(Serialize)]
pub(super) struct SeTimelineMetadata<'a> {
disk_consistent_lsn: &'a Lsn,
prev_record_lsn: &'a Option<Lsn>,
ancestor_timeline: &'a Option<ZTimelineId>,
ancestor_lsn: &'a Lsn,
}
impl<'a> From<&'a TimelineMetadata> for SeTimelineMetadata<'a> {
fn from(other: &'a TimelineMetadata) -> Self {
Self {
disk_consistent_lsn: &other.disk_consistent_lsn,
prev_record_lsn: &other.prev_record_lsn,
ancestor_timeline: &other.ancestor_timeline,
ancestor_lsn: &other.ancestor_lsn,
}
}
}
#[derive(Deserialize)]
pub(super) struct DeTimelineMetadata {
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
}
impl From<DeTimelineMetadata> for TimelineMetadata {
fn from(other: DeTimelineMetadata) -> Self {
Self {
disk_consistent_lsn: other.disk_consistent_lsn,
prev_record_lsn: other.prev_record_lsn,
ancestor_timeline: other.ancestor_timeline,
ancestor_lsn: other.ancestor_lsn,
}
}
}
}
#[cfg(test)]
mod tests {
use crate::repository::repo_harness::TIMELINE_ID;
use super::*;
#[test]
fn metadata_serializes_correctly() {
let original_metadata = TimelineMetadata {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
};
let metadata_bytes = original_metadata
.to_bytes()
.expect("Should serialize correct metadata to bytes");
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
assert_eq!(
deserialized_metadata, original_metadata,
"Metadata that was serialized to bytes and deserialized back should not change"
);
}
}

View File

@@ -16,8 +16,9 @@ use anyhow::{bail, Context};
use tokio::{fs, io};
use tracing::*;
use crate::layered_repository::metadata::METADATA_FILE_NAME;
use super::{parse_ids_from_path, strip_path_prefix, RelishStorage, RemoteRelishInfo};
use crate::layered_repository::METADATA_FILE_NAME;
pub struct LocalFs {
pageserver_workdir: &'static Path,
@@ -214,6 +215,7 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
#[cfg(test)]
mod pure_tests {
use crate::{
layered_repository::metadata::METADATA_FILE_NAME,
relish_storage::test_utils::{
custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path,
},

View File

@@ -11,7 +11,7 @@ use anyhow::Context;
use s3::{bucket::Bucket, creds::Credentials, region::Region};
use crate::{
layered_repository::METADATA_FILE_NAME,
layered_repository::metadata::METADATA_FILE_NAME,
relish_storage::{parse_ids_from_path, strip_path_prefix, RelishStorage, RemoteRelishInfo},
S3Config,
};

View File

@@ -75,7 +75,7 @@ use tracing::*;
use super::{RelishStorage, RemoteRelishInfo};
use crate::{
layered_repository::{metadata_path, TimelineMetadata},
layered_repository::metadata::{metadata_path, TimelineMetadata},
tenant_mgr::register_relish_download,
PageServerConf,
};
@@ -151,7 +151,9 @@ struct RemoteTimeline {
impl RemoteTimeline {
fn disk_consistent_lsn(&self) -> Option<Lsn> {
self.metadata.as_ref().map(|meta| meta.disk_consistent_lsn)
self.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn())
}
}
@@ -333,7 +335,7 @@ fn latest_timelines(
if latest_timeline_id != &remote_timeline_id
&& timeline_metadata
.as_ref()
.map(|metadata| metadata.disk_consistent_lsn)
.map(|metadata| metadata.disk_consistent_lsn())
< remote_timeline_data.disk_consistent_lsn()
{
*latest_timeline_id = remote_timeline_id;
@@ -518,8 +520,8 @@ async fn upload_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath = P
match &uploaded_timeline_files.metadata {
None => debug!("Partially uploaded timeline found, downloading missing files only"),
Some(remote_metadata) => {
let new_lsn = new_upload.metadata.disk_consistent_lsn;
let remote_lsn = remote_metadata.disk_consistent_lsn;
let new_lsn = new_upload.metadata.disk_consistent_lsn();
let remote_lsn = remote_metadata.disk_consistent_lsn();
match new_lsn.cmp(&remote_lsn) {
Ordering::Equal | Ordering::Less => {
warn!(
@@ -903,7 +905,7 @@ mod tests {
let new_upload_metadata = dummy_metadata(Lsn(0x20));
assert!(
new_upload_metadata.disk_consistent_lsn < first_upload_metadata.disk_consistent_lsn
new_upload_metadata.disk_consistent_lsn() < first_upload_metadata.disk_consistent_lsn()
);
let new_upload =
create_local_timeline(&repo_harness, TIMELINE_ID, &["b", "c"], new_upload_metadata)?;
@@ -927,7 +929,8 @@ mod tests {
)?;
let second_paths = second_timeline.layers.clone();
assert!(
first_upload_metadata.disk_consistent_lsn < second_upload_metadata.disk_consistent_lsn
first_upload_metadata.disk_consistent_lsn()
< second_upload_metadata.disk_consistent_lsn()
);
ensure_correct_timeline_upload(
&repo_harness,
@@ -955,7 +958,8 @@ mod tests {
let third_upload_metadata = dummy_metadata(Lsn(0x50));
assert!(
second_upload_metadata.disk_consistent_lsn < third_upload_metadata.disk_consistent_lsn
second_upload_metadata.disk_consistent_lsn()
< third_upload_metadata.disk_consistent_lsn()
);
let third_timeline = create_local_timeline(
&repo_harness,
@@ -1249,7 +1253,7 @@ mod tests {
while let Some(task) = queue_accessor.pop() {
let task_lsn = match &task {
SyncTask::Upload(LocalTimeline { metadata, .. }) => {
Some(metadata.disk_consistent_lsn)
Some(metadata.disk_consistent_lsn())
}
SyncTask::UrgentDownload(remote_timeline) | SyncTask::Download(remote_timeline) => {
remote_timeline.disk_consistent_lsn()
@@ -1257,8 +1261,8 @@ mod tests {
};
if let Some(task_lsn) = task_lsn {
if task_lsn == smaller_lsn_metadata.disk_consistent_lsn
|| task_lsn == bigger_lsn_metadata.disk_consistent_lsn
if task_lsn == smaller_lsn_metadata.disk_consistent_lsn()
|| task_lsn == bigger_lsn_metadata.disk_consistent_lsn()
{
ordered_tasks.push(task);
}
@@ -1549,11 +1553,6 @@ mod tests {
}
fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
TimelineMetadata {
disk_consistent_lsn,
prev_record_lsn: None,
ancestor_timeline: None,
ancestor_lsn: Lsn(0),
}
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0))
}
}

View File

@@ -323,9 +323,10 @@ pub mod repo_harness {
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use crate::layered_repository::metadata::METADATA_FILE_NAME;
use super::repo_harness::*;
use super::*;
use crate::layered_repository::METADATA_FILE_NAME;
use postgres_ffi::{pg_constants, xlog_utils::SIZEOF_CHECKPOINT};
/// Arbitrary relation tag, for testing.