add snapshot squashing

Add logic to squash snapshot files.
Add snaptool (a binary for inspecting and manipulating snapshot files).
Use bookfile 0.3, which allows concurrent reads.
This commit is contained in:
Eric Seppanen
2021-07-13 16:56:34 -07:00
parent acfc5c5d21
commit c3833ef0f4
6 changed files with 345 additions and 68 deletions

80
Cargo.lock generated
View File

@@ -190,18 +190,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bookfile"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7db391acd99b8bdce5d5a66ca28530761affec9a407df91aee668fc318e3db71"
dependencies = [
"aversion",
"byteorder",
"serde",
"thiserror",
]
[[package]]
name = "bookfile"
version = "0.3.0"
@@ -715,6 +703,15 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1193,7 +1190,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"bookfile 0.3.0",
"bookfile",
"byteorder",
"bytes",
"chrono",
@@ -1415,6 +1412,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1950,9 +1971,10 @@ version = "0.1.0"
dependencies = [
"anyhow",
"aversion",
"bookfile 0.2.0",
"bookfile",
"rand",
"serde",
"structopt",
"tempfile",
"zenith_utils",
]
@@ -1989,6 +2011,30 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "structopt"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71"
dependencies = [
"clap",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
@@ -2297,6 +2343,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"

View File

@@ -3,14 +3,17 @@ name = "snapfile"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "snaptool"
path = "snaptool/main.rs"
[dependencies]
anyhow = "1.0"
aversion = "0.2"
bookfile = "^0.2"
bookfile = "0.3"
serde = { version = "1.0", features = ["derive"] }
rand = "0.8.3"
structopt = "0.3"
zenith_utils = { path = "../zenith_utils" }
[dev-dependencies]

64
snapfile/snaptool/main.rs Normal file
View File

@@ -0,0 +1,64 @@
use anyhow::{Context, Result};
use snapfile::{squash, SnapFile};
use std::env::current_dir;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(StructOpt)]
#[structopt(about = "A tool for manipulating snapshot files")]
enum Params {
Squash(Squash),
Describe(Describe),
}
#[derive(StructOpt)]
struct Squash {
older: PathBuf,
newer: PathBuf,
}
#[derive(StructOpt)]
struct Describe {
file: PathBuf,
}
fn print_errors(error: anyhow::Error) {
let formatted: Vec<_> = error.chain().map(ToString::to_string).collect();
eprintln!("{}", formatted.join(": "));
}
fn main() {
let res = snaptool_main();
if let Err(e) = res {
print_errors(e);
}
}
fn snaptool_main() -> Result<()> {
let params = Params::from_args();
match &params {
Params::Squash(squash_params) => {
let out_dir = current_dir()?;
squash(&squash_params.older, &squash_params.newer, &out_dir).with_context(|| {
format!(
"squash {} {}",
squash_params.older.to_string_lossy(),
squash_params.newer.to_string_lossy()
)
})?;
}
Params::Describe(describe_params) => {
describe(describe_params)
.with_context(|| format!("describe {}", describe_params.file.to_string_lossy()))?;
}
}
Ok(())
}
fn describe(params: &Describe) -> Result<()> {
let mut snap = SnapFile::new(&params.file)?;
let meta = snap.read_meta()?;
println!("{:?}: {:#?}", params.file, meta);
Ok(())
}

View File

@@ -5,23 +5,29 @@
#![warn(clippy::cast_possible_truncation)]
mod page;
mod squash;
mod versioned;
#[doc(inline)]
pub use page::Page;
use anyhow::{anyhow, bail, Result};
#[doc(inline)]
pub use squash::squash;
use anyhow::{bail, Context, Result};
use aversion::group::{DataSink, DataSourceExt};
use aversion::util::cbor::CborData;
use bookfile::{Book, BookWriter, ChapterIndex, ChapterWriter};
use bookfile::{Book, BookWriter, ChapterWriter};
use std::ffi::OsString;
use std::fs::File;
use std::io::Write;
use std::ops::AddAssign;
use std::path::{Path, PathBuf};
use versioned::{PageIndex, Predecessor, SnapFileMeta};
pub use versioned::{PageIndex, PageLocation, Predecessor, SnapFileMeta};
use zenith_utils::lsn::Lsn;
impl SnapFileMeta {
fn new(previous: Option<SnapFileMeta>, lsn: u64) -> Self {
pub fn new(previous: Option<SnapFileMeta>, lsn: Lsn) -> Self {
// Store the metadata of the predecessor snapshot, if there is one.
let predecessor = previous.map(|prev| Predecessor {
id: prev.snap_id,
@@ -32,7 +38,7 @@ impl SnapFileMeta {
SnapFileMeta {
snap_id,
predecessor,
lsn,
lsn: lsn.into(),
}
}
@@ -45,7 +51,7 @@ impl PageIndex {
/// Retrieve the page offset from the index.
///
/// If the page is not in the index, returns `None`.
fn get_page_offset(&self, page_num: u64) -> Option<u64> {
fn get_page_location(&self, page_num: u64) -> Option<PageLocation> {
self.map.get(&page_num).copied()
}
@@ -54,11 +60,23 @@ impl PageIndex {
}
}
impl PageLocation {
fn to_offset(&self) -> u64 {
// Counts in units of one page.
self.0 * 8192
}
}
impl AddAssign<u64> for PageLocation {
fn add_assign(&mut self, rhs: u64) {
self.0 += rhs;
}
}
/// A read-only snapshot file.
pub struct SnapFile {
book: Book<File>,
page_index: PageIndex,
page_chapter_num: ChapterIndex,
}
impl SnapFile {
@@ -67,36 +85,28 @@ impl SnapFile {
/// This call will validate some of the file's format and read the file's
/// metadata; it may return an error if the file format is invalid.
pub fn new(path: &Path) -> Result<Self> {
let file = File::open(path)?;
let mut book = Book::new(file)?;
let file =
File::open(path).with_context(|| format!("snapfile {}", path.to_string_lossy()))?;
let book = Book::new(file)?;
if book.magic() != versioned::SNAPFILE_MAGIC {
bail!("bad magic number");
}
// Read the page index into memory.
let chapter_num = book
.find_chapter(versioned::CHAPTER_PAGE_INDEX)
.ok_or_else(|| anyhow!("snapfile missing index chapter"))?;
let chapter_reader = book.chapter_reader(chapter_num)?;
let chapter_reader = book
.chapter_reader(versioned::CHAPTER_PAGE_INDEX)
.context("snapfile missing index chapter")?;
let mut source = CborData::new(chapter_reader);
let page_index: PageIndex = source.expect_message()?;
let page_chapter_num = book
.find_chapter(versioned::CHAPTER_PAGES)
.ok_or_else(|| anyhow!("snapfile missing pages chapter"))?;
Ok(SnapFile {
book,
page_index,
page_chapter_num,
})
Ok(SnapFile { book, page_index })
}
/// Read the snapshot metadata.
pub fn read_meta(&mut self) -> Result<SnapFileMeta> {
let chapter_num = self
let chapter_reader = self
.book
.find_chapter(versioned::CHAPTER_SNAP_META)
.ok_or_else(|| anyhow!("snapfile missing meta"))?;
let chapter_reader = self.book.chapter_reader(chapter_num)?;
.chapter_reader(versioned::CHAPTER_SNAP_META)
.context("snapfile missing meta")?;
let mut source = CborData::new(chapter_reader);
let meta: SnapFileMeta = source.expect_message()?;
Ok(meta)
@@ -112,7 +122,7 @@ impl SnapFile {
/// Returns `true` if the given page is stored in this snapshot file,
/// `false` if not.
pub fn has_page(&self, page_num: u64) -> bool {
self.page_index.get_page_offset(page_num).is_some()
self.page_index.get_page_location(page_num).is_some()
}
/// Read a page.
@@ -120,22 +130,61 @@ impl SnapFile {
/// If this returns Ok(None), that means that this file does not store
/// the requested page.
/// This should only fail (returning `Err`) if an IO error occurs.
pub fn read_page(&mut self, page_num: u64) -> Result<Option<Page>> {
match self.page_index.get_page_offset(page_num) {
pub fn read_page(&self, page_num: u64) -> Result<Option<Page>> {
match self.page_index.get_page_location(page_num) {
None => Ok(None),
Some(page_offset) => {
// Compute the true byte offset in the file.
let page_offset = page_offset * 8192;
let chapter_reader = self.book.chapter_reader(self.page_chapter_num)?;
let mut page_data = Page::default();
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
if bytes_read != 8192 {
bail!("read truncated page");
}
Ok(Some(page_data))
}
Some(page_offset) => Ok(Some(self._read_page(page_offset)?)),
}
}
/// Read page data from the file.
///
/// This does the work for read_page and PageIter.
fn _read_page(&self, page_location: PageLocation) -> Result<Page> {
// Compute the true byte offset in the file.
let page_offset = page_location.to_offset();
let chapter_reader = self
.book
.chapter_reader(versioned::CHAPTER_PAGES)
.context("snapfile missing pages chapter")?;
let mut page_data = Page::default();
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
if bytes_read != 8192 {
bail!("read truncated page");
}
Ok(page_data)
}
/// Iterate over pages.
///
/// This will return an iterator over (usize, )
pub fn all_pages(&self) -> PageIter {
let inner = (&self.page_index.map).into_iter();
PageIter {
snapfile: self,
inner,
}
}
}
/// An iterator over all pages in the snapshot file.
pub struct PageIter<'a> {
snapfile: &'a SnapFile,
inner: std::collections::btree_map::Iter<'a, u64, PageLocation>,
}
impl Iterator for PageIter<'_> {
type Item = Result<(u64, Page)>;
fn next(&mut self) -> Option<Self::Item> {
let (page_num, page_offset) = self.inner.next()?;
let result = self
.snapfile
._read_page(*page_offset)
.map(|page_data| (*page_num, page_data));
Some(result)
}
}
/// `SnapWriter` creates a new snapshot file.
@@ -145,18 +194,13 @@ pub struct SnapWriter {
writer: ChapterWriter<File>,
page_index: PageIndex,
meta: SnapFileMeta,
current_offset: u64,
current_offset: PageLocation,
}
impl SnapWriter {
/// Create a new `SnapWriter`.
///
/// The LSN is the last page update present in this snapshot.
///
/// If this is an incremental snapshot, supply the metadata of the previous
/// snapshot.
pub fn new(dir: &Path, previous: Option<SnapFileMeta>, lsn: Lsn) -> Result<Self> {
let meta = SnapFileMeta::new(previous, lsn.into());
pub fn new(dir: &Path, meta: SnapFileMeta) -> Result<Self> {
let mut path = PathBuf::from(dir);
path.push(meta.to_filename());
let file = File::create(path)?;
@@ -174,7 +218,7 @@ impl SnapWriter {
writer,
page_index: PageIndex::default(),
meta,
current_offset: 0,
current_offset: PageLocation::default(),
})
}
@@ -228,7 +272,8 @@ mod tests {
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with two pages.
let mut snap = SnapWriter::new(dir.path(), None, Lsn(1234)).unwrap();
let meta = SnapFileMeta::new(None, Lsn(1234));
let mut snap = SnapWriter::new(dir.path(), meta).unwrap();
// Write the pages out of order, because why not?
let page99 = [99u8; 8192];
snap.write_page(99, page99).unwrap();
@@ -268,7 +313,8 @@ mod tests {
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with no pages.
let snap = SnapWriter::new(dir.path(), None, Lsn(1234)).unwrap();
let meta = SnapFileMeta::new(None, Lsn(1234));
let snap = SnapWriter::new(dir.path(), meta).unwrap();
snap.finish().unwrap()
};
@@ -276,7 +322,7 @@ mod tests {
// Read the snapshot file.
let mut path = PathBuf::from(dir.path());
path.push(snap_meta.to_filename());
let mut snap = SnapFile::new(&path).unwrap();
let snap = SnapFile::new(&path).unwrap();
assert_eq!(snap.page_index.page_count(), 0);
assert!(!snap.has_page(0));
assert!(!snap.has_page(99));

99
snapfile/src/squash.rs Normal file
View File

@@ -0,0 +1,99 @@
use crate::{Page, PageIter, SnapFile, SnapFileMeta, SnapWriter};
use anyhow::{bail, Result};
use std::cmp::Ordering;
use std::path::Path;
// A helper struct that holds an iterator, along with the last
// value taken from the iterator.
struct PageStepper<'a> {
it: PageIter<'a>,
pub cache: Option<(u64, Page)>,
}
impl<'a> PageStepper<'a> {
fn new(snapfile: &'a SnapFile) -> Result<Self> {
let mut it = snapfile.all_pages();
let cache = it.next().transpose()?;
Ok(PageStepper { it, cache })
}
/// Read a new page from the iterator, returning the previous page.
fn step(&mut self) -> Result<Option<(u64, Page)>> {
let mut next = self.it.next().transpose()?;
std::mem::swap(&mut self.cache, &mut next);
Ok(next)
}
}
/// Squash two snapshot files into one.
///
/// The resulting snapshot will contain all of the pages from both files.
/// If the same page number is stored in both, it will keep the page from
/// the newer snapshot.
///
/// The name of the resulting file will be automatically generated from
/// the snapshot metadata.
pub fn squash(older: &Path, newer: &Path, out_dir: &Path) -> Result<()> {
let mut snap1 = SnapFile::new(older)?;
let mut snap2 = SnapFile::new(newer)?;
let meta1 = snap1.read_meta()?;
let meta2 = snap2.read_meta()?;
// Check that snap1 is the predecessor of snap2.
match meta2.predecessor {
Some(pred) if pred.id == meta1.snap_id => {}
_ => {
bail!(
"snap file {:?} is not the predecessor of {:?}",
&older,
&newer,
);
}
}
// The new combined snapshot will have most fields from meta2 (the later
// snapshot), but will have the predecessor from meta1.
let new_meta = SnapFileMeta {
// FIXME: Wow, this seems wrong. Need to sort out what to do here.
// should snap_id even exist? The fact that we plan on squashing
// snapshots often implies that maybe they shouldn't.
// How do we identify predecessor? timeline_id + lsn?
snap_id: meta2.snap_id,
predecessor: meta1.predecessor,
lsn: meta2.lsn,
};
let mut snap_writer = SnapWriter::new(&out_dir, new_meta)?;
let mut iter1 = PageStepper::new(&snap1)?;
let mut iter2 = PageStepper::new(&snap2)?;
loop {
let next_page = match (&iter1.cache, &iter2.cache) {
(None, None) => break,
(Some(_), None) => iter1.step()?,
(None, Some(_)) => iter2.step()?,
(Some(x), Some(y)) => {
// If these are two different page numbers, then advance the iterator
// with the numerically lower number.
// If they are the same page number, then store the one from the newer
// snapshot, and discard the other (advancing both iterators).
match x.0.cmp(&y.0) {
Ordering::Less => iter1.step()?,
Ordering::Greater => iter2.step()?,
Ordering::Equal => {
let _ = iter1.step()?;
iter2.step()?
}
}
}
};
// This can't be None, because we would already checked inside the match
// statement.
let (page_num, page_data) = next_page.unwrap();
snap_writer.write_page(page_num, page_data)?;
}
snap_writer.finish()?;
Ok(())
}

View File

@@ -55,16 +55,29 @@ pub struct SnapFileMetaV1 {
pub lsn: u64,
}
/// A type alias for the latest version of `SnapFileMeta`.
pub type SnapFileMeta = SnapFileMetaV1;
/// A page location within a file.
///
/// Note: this is an opaque value that may not be the true byte offset;
/// it may be relative to some other location or measured in units other
/// than bytes.
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(transparent)]
pub struct PageLocationV1(pub(crate) u64);
/// A type alias for the latest version of `PageLocation`.
pub type PageLocation = PageLocationV1;
/// An index from page number to offset within the pages chapter.
#[derive(Debug, Default, Serialize, Deserialize, Versioned, UpgradeLatest)]
pub struct PageIndexV1 {
/// A map from page number to file offset.
pub(crate) map: BTreeMap<u64, u64>,
pub(crate) map: BTreeMap<u64, PageLocationV1>,
}
// A placeholder type, that will always point to the latest version.
/// A type alias for the latest version of `PageIndex`.
pub type PageIndex = PageIndexV1;
// Each message gets a unique message id, for tracking by the aversion traits.