Compare commits

...

7 Commits

Author SHA1 Message Date
Eric Seppanen
5b79e033bd wip: adapt layered_repository to snapfile 2021-08-19 20:23:11 -07:00
Eric Seppanen
93b2e49939 remove snapshot id; replace with timeline id
The snapshot id doesn't make sense when two snapshots are squashed.
Better to use the timeline id anyway.

It's still a little strange to squash two timelines together, but it
must be possible to flatten history even if there have been many branch
events, so this is should be a control plane / snapshot management
problem.
2021-08-18 12:11:22 -07:00
Eric Seppanen
c3833ef0f4 add snapshot squashing
Add logic to squash snapshot files.
Add snaptool (a binary for inspecting and manipulating snapshot files).
Use bookfile 0.3, which allows concurrent reads.
2021-08-18 12:11:22 -07:00
Eric Seppanen
acfc5c5d21 add another Page constructor, which copies bytes 2021-08-18 12:11:22 -07:00
Eric Seppanen
0a0d12368e snapfile: add snapshot metadata
Add some basic metadata to the snapshot file, including an id number,
predecessor snapshot, and lsn.
2021-08-18 12:11:22 -07:00
Eric Seppanen
8d2b517359 snapfile: split apart code into multiple files
versioned.rs: for things that get serialized and must be versioned to
avoid breaking backwards compatibility.

page.rs: for the Page struct.
2021-08-18 12:11:22 -07:00
Eric Seppanen
26bcd72619 add snapfile
The snapfile crate implements a snapshot file format. The format relies
heavily on the bookfile crate for the structured file format, and the
aversion crate for versioned data structures.

The overall structure of the file looks like this:
- first 4KB: bookfile header
- next 8KB * N: raw page data for N pages
- page index map (from page identifier to data offset)
- bookfile chapter index

When a SnapFile is opened for reading, the page index map is read into
memory; any page can be read directly from the file from that point.
2021-08-18 12:11:22 -07:00
12 changed files with 904 additions and 57 deletions

78
Cargo.lock generated
View File

@@ -703,6 +703,15 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1403,6 +1412,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1932,6 +1965,21 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "snapfile"
version = "0.1.0"
dependencies = [
"anyhow",
"aversion",
"bookfile",
"hex",
"rand",
"serde",
"structopt",
"tempfile",
"zenith_utils",
]
[[package]]
name = "socket2"
version = "0.4.0"
@@ -1964,6 +2012,30 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "structopt"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71"
dependencies = [
"clap",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
@@ -2272,6 +2344,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"

View File

@@ -9,6 +9,7 @@ members = [
"zenith",
"zenith_metrics",
"zenith_utils",
"snapfile",
]
[profile.release]

View File

@@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait;
mod inmemory_layer;
mod layer_map;
mod page_history;
mod snapshot_layer;
mod storage_layer;

View File

@@ -2,15 +2,15 @@
//! An in-memory layer stores recently received page versions in memory. The page versions
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::page_history::PageHistory;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::SnapshotLayer;
use crate::layered_repository::{LayeredTimeline, SnapshotLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use log::*;
use std::collections::BTreeMap;
@@ -43,9 +43,9 @@ pub struct InMemoryLayerInner {
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
/// Indexed by block number.
///
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
///
/// `segsizes` tracks the size of the segment at different points in time.
@@ -90,29 +90,32 @@ impl Layer for InMemoryLayer {
) -> Result<Option<Lsn>> {
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
assert!(self.seg.blknum_in_seg(blknum));
{
let inner = self.inner.lock().unwrap();
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
let pages = &inner.pages;
// FIXME: this assumes the latest page version is always the right answer.
// How should this work if the requested lsn is in the past? in the future?
let latest_version = pages
.get(&blknum)
.and_then(PageHistory::latest)
.ok_or_else(|| anyhow!("page not found"))?;
let (entry_lsn, entry) = latest_version;
if true {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_base_image_lsn = None;
break;
} else if let Some(rec) = &entry.record {
reconstruct_data.records.push(rec.clone());
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
need_base_image_lsn = None;
break;
} else {
need_base_image_lsn = Some(*entry_lsn);
need_base_image_lsn = Some(entry_lsn);
}
} else {
// No base image, and no WAL record. Huh?
@@ -120,7 +123,7 @@ impl Layer for InMemoryLayer {
}
}
// release lock on 'page_versions'
// release lock on self.pages
}
Ok(need_base_image_lsn)
@@ -184,7 +187,7 @@ impl InMemoryLayer {
start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
pages: BTreeMap::new(),
segsizes: BTreeMap::new(),
}),
})
@@ -230,15 +233,11 @@ impl InMemoryLayer {
);
let mut inner = self.inner.lock().unwrap();
let old = inner.page_versions.insert((blknum, lsn), pv);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
let page_history = inner
.pages
.entry(blknum)
.or_insert_with(PageHistory::default);
page_history.push(lsn, pv);
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
@@ -311,7 +310,7 @@ impl InMemoryLayer {
timelineid,
lsn
);
let mut page_versions = BTreeMap::new();
let mut pages = BTreeMap::new();
let mut segsizes = BTreeMap::new();
let seg = src.get_seg_tag();
@@ -333,7 +332,8 @@ impl InMemoryLayer {
page_image: Some(img),
record: None,
};
page_versions.insert((blknum, lsn), pv);
let page_history = PageHistory::from_image(lsn, pv);
pages.insert(blknum, page_history);
}
Ok(InMemoryLayer {
@@ -344,8 +344,8 @@ impl InMemoryLayer {
start_lsn: lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: page_versions,
segsizes: segsizes,
pages,
segsizes,
}),
})
}
@@ -388,10 +388,11 @@ impl InMemoryLayer {
};
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let mut before_page_versions;
let mut before_pages = BTreeMap::new();
let mut before_segsizes;
let mut after_page_versions;
let mut after_segsizes;
let mut after_pages = BTreeMap::new();
if !dropped {
before_segsizes = BTreeMap::new();
after_segsizes = BTreeMap::new();
@@ -403,20 +404,16 @@ impl InMemoryLayer {
}
}
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
for (blknum, page_history) in inner.pages.iter() {
let (old, new) = page_history.clone().split_at(end_lsn);
before_pages.insert(*blknum, old);
after_pages.insert(*blknum, new);
}
} else {
before_page_versions = inner.page_versions.clone();
before_pages = inner.pages.clone();
before_segsizes = inner.segsizes.clone();
after_segsizes = BTreeMap::new();
after_page_versions = BTreeMap::new();
after_pages = BTreeMap::new();
}
// we can release the lock now.
@@ -431,13 +428,13 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
dropped,
before_page_versions,
before_pages,
before_segsizes,
)?;
// If there were any "new" page versions, initialize a new in-memory layer to hold
// them
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_open = Self::copy_snapshot(
@@ -449,7 +446,7 @@ impl InMemoryLayer {
end_lsn,
)?;
let mut new_inner = new_open.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.pages.append(&mut after_pages);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
@@ -476,14 +473,16 @@ impl InMemoryLayer {
for (k, v) in inner.segsizes.iter() {
result += &format!("{}: {}\n", k, v);
}
for (k, v) in inner.page_versions.iter() {
result += &format!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
v.page_image.is_some(),
v.record.is_some()
);
for (page_num, page_history) in inner.pages.iter() {
for (lsn, image) in page_history.iter() {
result += &format!(
"blk {} at {}: {}/{}\n",
page_num,
lsn,
image.page_image.is_some(),
image.record.is_some()
);
}
}
result

View File

@@ -0,0 +1,94 @@
use super::storage_layer::PageVersion;
use std::collections::VecDeque;
use zenith_utils::lsn::Lsn;
/// A data structure that holds one or more versions of a particular page number.
//
#[derive(Default, Clone)]
pub struct PageHistory {
/// Pages stored in order, from oldest to newest.
pages: VecDeque<(Lsn, PageVersion)>,
}
impl PageHistory {
/// Create a new PageHistory containing a single image.
pub fn from_image(lsn: Lsn, image: PageVersion) -> Self {
let mut pages = VecDeque::new();
pages.push_back((lsn, image));
PageHistory { pages }
}
/// Push a newer page image.
pub fn push(&mut self, lsn: Lsn, page: PageVersion) {
if let Some((back_lsn, _)) = self.pages.back() {
debug_assert_ne!(
back_lsn, &lsn,
"push page at lsn {:?} but one already exists",
lsn
);
debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn");
}
self.pages.push_back((lsn, page));
}
pub fn latest(&self) -> Option<(Lsn, &PageVersion)> {
self.pages.back().map(|(lsn, page)| (*lsn, page))
}
/// Split a page history at a particular LSN.
///
/// This consumes this PageHistory and returns two new ones.
/// Any changes exactly matching the split LSN will be in the
/// "old" history.
//
// FIXME: Is this necessary? There is some debate whether "splitting"
// layers is the best design.
//
pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) {
let mut old = PageHistory::default();
let mut new = PageHistory::default();
for (lsn, page) in self.pages {
if lsn > split_lsn {
new.push(lsn, page)
} else {
old.push(lsn, page);
}
}
(old, new)
}
pub fn iter(&self) -> impl Iterator<Item = &(Lsn, PageVersion)> {
self.pages.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn page_history() {
fn make_page(b: u8) -> PageVersion {
let image = vec![b; 8192].into();
PageVersion {
page_image: Some(image),
record: None,
}
}
let mut ph = PageHistory::default();
ph.push(10.into(), make_page(1));
ph.push(20.into(), make_page(2));
ph.push(30.into(), make_page(3));
let (latest_lsn, latest_image) = ph.latest().unwrap();
assert_eq!(latest_lsn, 30.into());
assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3));
let mut it = ph.iter();
assert_eq!(it.next().unwrap().0, 10.into());
assert_eq!(it.next().unwrap().0, 20.into());
assert_eq!(it.next().unwrap().0, 30.into());
assert!(it.next().is_none());
}
}

View File

@@ -37,6 +37,7 @@
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
//!
use crate::layered_repository::page_history::PageHistory;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag,
};
@@ -236,7 +237,7 @@ pub struct SnapshotLayerInner {
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: BTreeMap<Lsn, u32>,
@@ -270,6 +271,7 @@ impl Layer for SnapshotLayer {
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<Option<Lsn>> {
/*
// Scan the BTreeMap backwards, starting from the given entry.
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
{
@@ -303,6 +305,9 @@ impl Layer for SnapshotLayer {
}
Ok(need_base_image_lsn)
*/
todo!()
}
/// Get size of the relation at given LSN
@@ -380,7 +385,7 @@ impl SnapshotLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<SnapshotLayer> {
let snapfile = SnapshotLayer {
@@ -393,10 +398,12 @@ impl SnapshotLayer {
dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: true,
page_versions: page_versions,
relsizes: relsizes,
pages,
relsizes,
}),
};
/*
let inner = snapfile.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
@@ -426,12 +433,16 @@ impl SnapshotLayer {
drop(inner);
Ok(snapfile)
*/
todo!()
}
///
/// Load the contents of the file into memory
///
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
/*
// quick exit if already loaded
let mut inner = self.inner.lock().unwrap();
@@ -469,6 +480,9 @@ impl SnapshotLayer {
};
Ok(inner)
*/
todo!()
}
/// Create SnapshotLayers representing all files on disk
@@ -479,6 +493,7 @@ impl SnapshotLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<Vec<Arc<SnapshotLayer>>> {
/*
let path = conf.timeline_path(&timelineid, &tenantid);
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
@@ -506,6 +521,8 @@ impl SnapshotLayer {
}
}
return Ok(snapfiles);
*/
todo!()
}
pub fn delete(&self) -> Result<()> {
@@ -519,11 +536,14 @@ impl SnapshotLayer {
/// it will need to be loaded back.
///
pub fn unload(&self) -> Result<()> {
/*
let mut inner = self.inner.lock().unwrap();
inner.page_versions = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
*/
todo!()
}
/// debugging function to print out the contents of the layer

21
snapfile/Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "snapfile"
version = "0.1.0"
edition = "2018"
[[bin]]
name = "snaptool"
path = "snaptool/main.rs"
[dependencies]
anyhow = "1.0"
aversion = "0.2"
bookfile = "0.3"
serde = { version = "1.0", features = ["derive"] }
rand = "0.8.3"
structopt = "0.3"
zenith_utils = { path = "../zenith_utils" }
hex = "0.4.3"
[dev-dependencies]
tempfile = "3.2"

64
snapfile/snaptool/main.rs Normal file
View File

@@ -0,0 +1,64 @@
use anyhow::{Context, Result};
use snapfile::{squash, SnapFile};
use std::env::current_dir;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(StructOpt)]
#[structopt(about = "A tool for manipulating snapshot files")]
enum Params {
Squash(Squash),
Describe(Describe),
}
#[derive(StructOpt)]
struct Squash {
older: PathBuf,
newer: PathBuf,
}
#[derive(StructOpt)]
struct Describe {
file: PathBuf,
}
fn print_errors(error: anyhow::Error) {
let formatted: Vec<_> = error.chain().map(ToString::to_string).collect();
eprintln!("{}", formatted.join(": "));
}
fn main() {
let res = snaptool_main();
if let Err(e) = res {
print_errors(e);
}
}
fn snaptool_main() -> Result<()> {
let params = Params::from_args();
match &params {
Params::Squash(squash_params) => {
let out_dir = current_dir()?;
squash(&squash_params.older, &squash_params.newer, &out_dir).with_context(|| {
format!(
"squash {} {}",
squash_params.older.to_string_lossy(),
squash_params.newer.to_string_lossy()
)
})?;
}
Params::Describe(describe_params) => {
describe(describe_params)
.with_context(|| format!("describe {}", describe_params.file.to_string_lossy()))?;
}
}
Ok(())
}
fn describe(params: &Describe) -> Result<()> {
let mut snap = SnapFile::new(&params.file)?;
let meta = snap.read_meta()?;
println!("{:?}: {:#?}", params.file, meta);
Ok(())
}

339
snapfile/src/lib.rs Normal file
View File

@@ -0,0 +1,339 @@
//! A file format for storage a snapshot of pages.
#![warn(missing_docs)]
#![forbid(unsafe_code)]
#![warn(clippy::cast_possible_truncation)]
mod page;
mod squash;
mod versioned;
#[doc(inline)]
pub use page::Page;
#[doc(inline)]
pub use squash::squash;
use anyhow::{bail, Context, Result};
use aversion::group::{DataSink, DataSourceExt};
use aversion::util::cbor::CborData;
use bookfile::{Book, BookWriter, ChapterWriter};
use std::ffi::OsString;
use std::fs::File;
use std::io::Write;
use std::ops::AddAssign;
use std::path::{Path, PathBuf};
pub use versioned::{PageIndex, PageLocation, Predecessor, SnapFileMeta};
use zenith_utils::lsn::Lsn;
impl SnapFileMeta {
pub fn new(previous: Option<SnapFileMeta>, timeline: [u8; 16], lsn: Lsn) -> Self {
// Store the metadata of the predecessor snapshot, if there is one.
let predecessor = previous.map(|prev| Predecessor {
timeline: prev.timeline,
lsn: prev.lsn,
});
SnapFileMeta {
timeline,
predecessor,
lsn: lsn.into(),
}
}
fn to_filename(&self) -> OsString {
let timeline_string = hex::encode(self.timeline);
let pred_lsn = match &self.predecessor {
None => 0,
Some(pred) => pred.lsn,
};
format!("{}_{:x}_{:x}.zdb", timeline_string, pred_lsn, self.lsn).into()
}
}
impl PageIndex {
/// Retrieve the page offset from the index.
///
/// If the page is not in the index, returns `None`.
fn get_page_location(&self, page_num: u64) -> Option<PageLocation> {
self.map.get(&page_num).copied()
}
fn page_count(&self) -> usize {
self.map.len()
}
}
impl PageLocation {
fn to_offset(&self) -> u64 {
// Counts in units of one page.
self.0 * 8192
}
}
impl AddAssign<u64> for PageLocation {
fn add_assign(&mut self, rhs: u64) {
self.0 += rhs;
}
}
/// A read-only snapshot file.
pub struct SnapFile {
book: Book<File>,
page_index: PageIndex,
}
impl SnapFile {
/// Open a new `SnapFile` for reading.
///
/// This call will validate some of the file's format and read the file's
/// metadata; it may return an error if the file format is invalid.
pub fn new(path: &Path) -> Result<Self> {
let file =
File::open(path).with_context(|| format!("snapfile {}", path.to_string_lossy()))?;
let book = Book::new(file)?;
if book.magic() != versioned::SNAPFILE_MAGIC {
bail!("bad magic number");
}
// Read the page index into memory.
let chapter_reader = book
.chapter_reader(versioned::CHAPTER_PAGE_INDEX)
.context("snapfile missing index chapter")?;
let mut source = CborData::new(chapter_reader);
let page_index: PageIndex = source.expect_message()?;
Ok(SnapFile { book, page_index })
}
/// Read the snapshot metadata.
pub fn read_meta(&mut self) -> Result<SnapFileMeta> {
let chapter_reader = self
.book
.chapter_reader(versioned::CHAPTER_SNAP_META)
.context("snapfile missing meta")?;
let mut source = CborData::new(chapter_reader);
let meta: SnapFileMeta = source.expect_message()?;
Ok(meta)
}
/// Return the number of pages stored in this snapshot.
pub fn page_count(&self) -> usize {
self.page_index.page_count()
}
/// Check if a page exists in this snapshot's index.
///
/// Returns `true` if the given page is stored in this snapshot file,
/// `false` if not.
pub fn has_page(&self, page_num: u64) -> bool {
self.page_index.get_page_location(page_num).is_some()
}
/// Read a page.
///
/// If this returns Ok(None), that means that this file does not store
/// the requested page.
/// This should only fail (returning `Err`) if an IO error occurs.
pub fn read_page(&self, page_num: u64) -> Result<Option<Page>> {
match self.page_index.get_page_location(page_num) {
None => Ok(None),
Some(page_offset) => Ok(Some(self._read_page(page_offset)?)),
}
}
/// Read page data from the file.
///
/// This does the work for read_page and PageIter.
fn _read_page(&self, page_location: PageLocation) -> Result<Page> {
// Compute the true byte offset in the file.
let page_offset = page_location.to_offset();
let chapter_reader = self
.book
.chapter_reader(versioned::CHAPTER_PAGES)
.context("snapfile missing pages chapter")?;
let mut page_data = Page::default();
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
if bytes_read != 8192 {
bail!("read truncated page");
}
Ok(page_data)
}
/// Iterate over pages.
///
/// This will return an iterator over (usize, )
pub fn all_pages(&self) -> PageIter {
let inner = (&self.page_index.map).into_iter();
PageIter {
snapfile: self,
inner,
}
}
}
/// An iterator over all pages in the snapshot file.
pub struct PageIter<'a> {
snapfile: &'a SnapFile,
inner: std::collections::btree_map::Iter<'a, u64, PageLocation>,
}
impl Iterator for PageIter<'_> {
type Item = Result<(u64, Page)>;
fn next(&mut self) -> Option<Self::Item> {
let (page_num, page_offset) = self.inner.next()?;
let result = self
.snapfile
._read_page(*page_offset)
.map(|page_data| (*page_num, page_data));
Some(result)
}
}
/// `SnapWriter` creates a new snapshot file.
///
/// A SnapWriter is created, has pages written into it, and is then closed.
pub struct SnapWriter {
writer: ChapterWriter<File>,
page_index: PageIndex,
meta: SnapFileMeta,
current_offset: PageLocation,
}
impl SnapWriter {
/// Create a new `SnapWriter`.
///
pub fn new(dir: &Path, meta: SnapFileMeta) -> Result<Self> {
let mut path = PathBuf::from(dir);
path.push(meta.to_filename());
let file = File::create(path)?;
let book = BookWriter::new(file, versioned::SNAPFILE_MAGIC)?;
// Write a chapter for the snapshot metadata.
let writer = book.new_chapter(versioned::CHAPTER_SNAP_META);
let mut sink = CborData::new(writer);
sink.write_message(&meta)?;
let book = sink.into_inner().close()?;
// Open a new chapter for raw page data.
let writer = book.new_chapter(versioned::CHAPTER_PAGES);
Ok(SnapWriter {
writer,
page_index: PageIndex::default(),
meta,
current_offset: PageLocation::default(),
})
}
/// Write a page into the snap file.
pub fn write_page<P>(&mut self, page_num: u64, page_data: P) -> Result<()>
where
P: Into<Page>,
{
let page_data: Page = page_data.into();
self.writer.write_all(page_data.as_ref())?;
let prev = self.page_index.map.insert(page_num, self.current_offset);
if prev.is_some() {
panic!("duplicate index for page {}", page_num);
}
self.current_offset += 1;
Ok(())
}
/// Finish writing pages.
///
/// This consumes the PagesWriter and completes the snapshot.
//
pub fn finish(self) -> Result<SnapFileMeta> {
let book = self.writer.close()?;
// Write out a page index and close the book. This will write out any
// necessary file metadata.
// FIXME: these 3 lines could be combined into a single function
// that means "serialize this data structure with this format into this chapter".
let writer = book.new_chapter(versioned::CHAPTER_PAGE_INDEX);
let mut sink = CborData::new(writer);
sink.write_message(&self.page_index)?;
// Close the chapter, then close the book.
sink.into_inner().close()?.close()?;
// Return the snapshot metadata to the caller.
Ok(self.meta)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile::TempDir;
const TEST_TIMELINE: [u8; 16] = [99u8; 16];
#[test]
fn snap_two_pages() {
// When `dir` goes out of scope the directory will be unlinked.
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with two pages.
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
let mut snap = SnapWriter::new(dir.path(), meta).unwrap();
// Write the pages out of order, because why not?
let page99 = [99u8; 8192];
snap.write_page(99, page99).unwrap();
let page33 = [33u8; 8192];
snap.write_page(33, page33).unwrap();
snap.finish().unwrap()
};
assert_eq!(snap_meta.lsn, 1234);
{
// Read the snapshot file and verify the contents.
let mut path = PathBuf::from(dir.path());
path.push(snap_meta.to_filename());
let mut snap = SnapFile::new(&path).unwrap();
assert_eq!(snap.page_count(), 2);
assert!(!snap.has_page(0));
assert!(snap.has_page(33));
assert!(!snap.has_page(98));
assert!(snap.has_page(99));
assert!(snap.read_page(0).unwrap().is_none());
let page = snap.read_page(33).unwrap().unwrap();
assert_eq!(*page.0, [33u8; 8192]);
let page = snap.read_page(99).unwrap().unwrap();
assert_eq!(*page.0, [99u8; 8192]);
// Make sure the deserialized metadata matches what we think we wrote.
let meta2 = snap.read_meta().unwrap();
assert_eq!(snap_meta, meta2);
}
}
#[test]
fn snap_zero_pages() {
// When `dir` goes out of scope the directory will be unlinked.
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with no pages.
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
let snap = SnapWriter::new(dir.path(), meta).unwrap();
snap.finish().unwrap()
};
{
// Read the snapshot file.
let mut path = PathBuf::from(dir.path());
path.push(snap_meta.to_filename());
let snap = SnapFile::new(&path).unwrap();
assert_eq!(snap.page_index.page_count(), 0);
assert!(!snap.has_page(0));
assert!(!snap.has_page(99));
assert!(snap.read_page(0).unwrap().is_none());
assert!(snap.read_page(99).unwrap().is_none());
}
}
}

42
snapfile/src/page.rs Normal file
View File

@@ -0,0 +1,42 @@
/// A single 8KB page.
pub struct Page(pub Box<[u8; 8192]>);
impl Page {
/// Create a page by copying bytes from another slice.
///
/// This is a copy, not a move. If the caller already has
/// an owned array then `From<[u8; 8192]>` can be used instead.
pub fn copy_slice(x: &[u8; 8192]) -> Self {
Page(Box::new(x.clone()))
}
}
impl Default for Page {
fn default() -> Self {
Page(Box::new([0u8; 8192]))
}
}
impl From<[u8; 8192]> for Page {
fn from(array: [u8; 8192]) -> Self {
Page(Box::new(array))
}
}
impl From<Box<[u8; 8192]>> for Page {
fn from(heap_array: Box<[u8; 8192]>) -> Self {
Page(heap_array)
}
}
impl AsRef<[u8; 8192]> for Page {
fn as_ref(&self) -> &[u8; 8192] {
self.0.as_ref()
}
}
impl AsMut<[u8; 8192]> for Page {
fn as_mut(&mut self) -> &mut [u8; 8192] {
self.0.as_mut()
}
}

100
snapfile/src/squash.rs Normal file
View File

@@ -0,0 +1,100 @@
use crate::{Page, PageIter, SnapFile, SnapFileMeta, SnapWriter};
use anyhow::{bail, Result};
use std::cmp::Ordering;
use std::path::Path;
// A helper struct that holds an iterator, along with the last
// value taken from the iterator.
struct PageStepper<'a> {
it: PageIter<'a>,
pub cache: Option<(u64, Page)>,
}
impl<'a> PageStepper<'a> {
fn new(snapfile: &'a SnapFile) -> Result<Self> {
let mut it = snapfile.all_pages();
let cache = it.next().transpose()?;
Ok(PageStepper { it, cache })
}
/// Read a new page from the iterator, returning the previous page.
fn step(&mut self) -> Result<Option<(u64, Page)>> {
let mut next = self.it.next().transpose()?;
std::mem::swap(&mut self.cache, &mut next);
Ok(next)
}
}
/// Squash two snapshot files into one.
///
/// The resulting snapshot will contain all of the pages from both files.
/// If the same page number is stored in both, it will keep the page from
/// the newer snapshot.
///
/// The name of the resulting file will be automatically generated from
/// the snapshot metadata.
pub fn squash(older: &Path, newer: &Path, out_dir: &Path) -> Result<()> {
let mut snap1 = SnapFile::new(older)?;
let mut snap2 = SnapFile::new(newer)?;
let meta1 = snap1.read_meta()?;
let meta2 = snap2.read_meta()?;
// Check that snap1 is the predecessor of snap2.
match meta2.predecessor {
Some(pred) if pred.timeline == meta1.timeline => {}
_ => {
bail!(
"snap file {:?} is not the predecessor of {:?}",
&older,
&newer,
);
}
}
// The new combined snapshot will have most fields from meta2 (the later
// snapshot), but will have the predecessor from meta1.
let new_meta = SnapFileMeta {
// There is some danger in squashing snapshots across two timelines,
// in that it's possible to get confused about what the history
// looks like. Ultimately, it should be possible to squash our way
// to a "complete" snapshot (that contains all pages), so this must
// be possible.
timeline: meta2.timeline,
predecessor: meta1.predecessor,
lsn: meta2.lsn,
};
let mut snap_writer = SnapWriter::new(&out_dir, new_meta)?;
let mut iter1 = PageStepper::new(&snap1)?;
let mut iter2 = PageStepper::new(&snap2)?;
loop {
let next_page = match (&iter1.cache, &iter2.cache) {
(None, None) => break,
(Some(_), None) => iter1.step()?,
(None, Some(_)) => iter2.step()?,
(Some(x), Some(y)) => {
// If these are two different page numbers, then advance the iterator
// with the numerically lower number.
// If they are the same page number, then store the one from the newer
// snapshot, and discard the other (advancing both iterators).
match x.0.cmp(&y.0) {
Ordering::Less => iter1.step()?,
Ordering::Greater => iter2.step()?,
Ordering::Equal => {
let _ = iter1.step()?;
iter2.step()?
}
}
}
};
// This can't be None, because we would already checked inside the match
// statement.
let (page_num, page_data) = next_page.unwrap();
snap_writer.write_page(page_num, page_data)?;
}
snap_writer.finish()?;
Ok(())
}

88
snapfile/src/versioned.rs Normal file
View File

@@ -0,0 +1,88 @@
//! Versioned data structures for snapshot files
//!
//! To ensure that future versions of software can read snapshot files,
//! all data structures that are serialized into the snapshot files should
//! live in this module.
//!
//! Once released, versioned data structures should never be modified.
//! Instead, new versions should be created and conversion functions should
//! be defined using the `FromVersion` trait.
use aversion::{assign_message_ids, UpgradeLatest, Versioned};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// A random constant, to identify this file type.
pub(crate) const SNAPFILE_MAGIC: u32 = 0x7fb8_38a8;
// Constant chapter numbers
// FIXME: the bookfile crate should use something better to index, e.g. strings.
/// Snapshot-specific file metadata
pub(crate) const CHAPTER_SNAP_META: u64 = 1;
/// A packed set of 8KB pages.
pub(crate) const CHAPTER_PAGES: u64 = 2;
/// An index of pages.
pub(crate) const CHAPTER_PAGE_INDEX: u64 = 3;
/// Information about the predecessor snapshot.
///
/// It contains the snap_id of the predecessor snapshot, and the LSN
/// of that snapshot.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Predecessor {
/// This is the ID number of the predecessor timeline.
///
/// This may match the current snapshot's timeline id, but
/// it may not (if the precessor was the branch point).
pub timeline: [u8; 16],
/// This is the LSN of the predecessor snapshot.
pub lsn: u64,
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Versioned, UpgradeLatest)]
pub struct SnapFileMetaV1 {
/// This is a unique ID number for this timeline.
///
/// This number guarantees that snapshot history is unique.
pub timeline: [u8; 16],
/// Information about the predecessor snapshot.
///
/// If `None`, this snapshot is the start of a new database.
pub predecessor: Option<Predecessor>,
/// This is the last LSN stored in this snapshot.
pub lsn: u64,
}
/// A type alias for the latest version of `SnapFileMeta`.
pub type SnapFileMeta = SnapFileMetaV1;
/// A page location within a file.
///
/// Note: this is an opaque value that may not be the true byte offset;
/// it may be relative to some other location or measured in units other
/// than bytes.
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(transparent)]
pub struct PageLocationV1(pub(crate) u64);
/// A type alias for the latest version of `PageLocation`.
pub type PageLocation = PageLocationV1;
/// An index from page number to offset within the pages chapter.
#[derive(Debug, Default, Serialize, Deserialize, Versioned, UpgradeLatest)]
pub struct PageIndexV1 {
/// A map from page number to file offset.
pub(crate) map: BTreeMap<u64, PageLocationV1>,
}
/// A type alias for the latest version of `PageIndex`.
pub type PageIndex = PageIndexV1;
// Each message gets a unique message id, for tracking by the aversion traits.
assign_message_ids! {
PageIndex: 100,
SnapFileMeta: 101,
}