mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 05:30:37 +00:00
Compare commits
7 Commits
proxy-http
...
snapfile
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5b79e033bd | ||
|
|
93b2e49939 | ||
|
|
c3833ef0f4 | ||
|
|
acfc5c5d21 | ||
|
|
0a0d12368e | ||
|
|
8d2b517359 | ||
|
|
26bcd72619 |
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -703,6 +703,15 @@ version = "0.11.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "heck"
|
||||||
|
version = "0.3.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-segmentation",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.1.19"
|
version = "0.1.19"
|
||||||
@@ -1403,6 +1412,30 @@ version = "0.2.10"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro-error-attr",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error-attr"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro-hack"
|
name = "proc-macro-hack"
|
||||||
version = "0.5.19"
|
version = "0.5.19"
|
||||||
@@ -1932,6 +1965,21 @@ version = "1.6.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
|
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "snapfile"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"aversion",
|
||||||
|
"bookfile",
|
||||||
|
"hex",
|
||||||
|
"rand",
|
||||||
|
"serde",
|
||||||
|
"structopt",
|
||||||
|
"tempfile",
|
||||||
|
"zenith_utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
@@ -1964,6 +2012,30 @@ version = "0.8.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
|
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "structopt"
|
||||||
|
version = "0.3.22"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71"
|
||||||
|
dependencies = [
|
||||||
|
"clap",
|
||||||
|
"lazy_static",
|
||||||
|
"structopt-derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "structopt-derive"
|
||||||
|
version = "0.4.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10"
|
||||||
|
dependencies = [
|
||||||
|
"heck",
|
||||||
|
"proc-macro-error",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "subtle"
|
name = "subtle"
|
||||||
version = "2.4.1"
|
version = "2.4.1"
|
||||||
@@ -2272,6 +2344,12 @@ dependencies = [
|
|||||||
"tinyvec",
|
"tinyvec",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-segmentation"
|
||||||
|
version = "1.8.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-width"
|
name = "unicode-width"
|
||||||
version = "0.1.8"
|
version = "0.1.8"
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ members = [
|
|||||||
"zenith",
|
"zenith",
|
||||||
"zenith_metrics",
|
"zenith_metrics",
|
||||||
"zenith_utils",
|
"zenith_utils",
|
||||||
|
"snapfile",
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait;
|
|||||||
|
|
||||||
mod inmemory_layer;
|
mod inmemory_layer;
|
||||||
mod layer_map;
|
mod layer_map;
|
||||||
|
mod page_history;
|
||||||
mod snapshot_layer;
|
mod snapshot_layer;
|
||||||
mod storage_layer;
|
mod storage_layer;
|
||||||
|
|
||||||
|
|||||||
@@ -2,15 +2,15 @@
|
|||||||
//! An in-memory layer stores recently received page versions in memory. The page versions
|
//! An in-memory layer stores recently received page versions in memory. The page versions
|
||||||
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
|
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
|
||||||
//!
|
//!
|
||||||
|
use crate::layered_repository::page_history::PageHistory;
|
||||||
use crate::layered_repository::storage_layer::{
|
use crate::layered_repository::storage_layer::{
|
||||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||||
};
|
};
|
||||||
use crate::layered_repository::LayeredTimeline;
|
use crate::layered_repository::{LayeredTimeline, SnapshotLayer};
|
||||||
use crate::layered_repository::SnapshotLayer;
|
|
||||||
use crate::repository::WALRecord;
|
use crate::repository::WALRecord;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use crate::{ZTenantId, ZTimelineId};
|
use crate::{ZTenantId, ZTimelineId};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
@@ -43,9 +43,9 @@ pub struct InMemoryLayerInner {
|
|||||||
|
|
||||||
///
|
///
|
||||||
/// All versions of all pages in the layer are are kept here.
|
/// All versions of all pages in the layer are are kept here.
|
||||||
/// Indexed by block number and LSN.
|
/// Indexed by block number.
|
||||||
///
|
///
|
||||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
pages: BTreeMap<u32, PageHistory>,
|
||||||
|
|
||||||
///
|
///
|
||||||
/// `segsizes` tracks the size of the segment at different points in time.
|
/// `segsizes` tracks the size of the segment at different points in time.
|
||||||
@@ -90,29 +90,32 @@ impl Layer for InMemoryLayer {
|
|||||||
) -> Result<Option<Lsn>> {
|
) -> Result<Option<Lsn>> {
|
||||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||||
|
|
||||||
assert!(self.seg.blknum_in_seg(blknum));
|
assert!(self.seg.blknum_in_seg(blknum));
|
||||||
|
|
||||||
{
|
{
|
||||||
let inner = self.inner.lock().unwrap();
|
let inner = self.inner.lock().unwrap();
|
||||||
let minkey = (blknum, Lsn(0));
|
let pages = &inner.pages;
|
||||||
let maxkey = (blknum, lsn);
|
|
||||||
let mut iter = inner
|
// FIXME: this assumes the latest page version is always the right answer.
|
||||||
.page_versions
|
// How should this work if the requested lsn is in the past? in the future?
|
||||||
.range((Included(&minkey), Included(&maxkey)));
|
|
||||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
let latest_version = pages
|
||||||
|
.get(&blknum)
|
||||||
|
.and_then(PageHistory::latest)
|
||||||
|
.ok_or_else(|| anyhow!("page not found"))?;
|
||||||
|
|
||||||
|
let (entry_lsn, entry) = latest_version;
|
||||||
|
if true {
|
||||||
if let Some(img) = &entry.page_image {
|
if let Some(img) = &entry.page_image {
|
||||||
reconstruct_data.page_img = Some(img.clone());
|
reconstruct_data.page_img = Some(img.clone());
|
||||||
need_base_image_lsn = None;
|
need_base_image_lsn = None;
|
||||||
break;
|
|
||||||
} else if let Some(rec) = &entry.record {
|
} else if let Some(rec) = &entry.record {
|
||||||
reconstruct_data.records.push(rec.clone());
|
reconstruct_data.records.push(rec.clone());
|
||||||
if rec.will_init {
|
if rec.will_init {
|
||||||
// This WAL record initializes the page, so no need to go further back
|
// This WAL record initializes the page, so no need to go further back
|
||||||
need_base_image_lsn = None;
|
need_base_image_lsn = None;
|
||||||
break;
|
|
||||||
} else {
|
} else {
|
||||||
need_base_image_lsn = Some(*entry_lsn);
|
need_base_image_lsn = Some(entry_lsn);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// No base image, and no WAL record. Huh?
|
// No base image, and no WAL record. Huh?
|
||||||
@@ -120,7 +123,7 @@ impl Layer for InMemoryLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// release lock on 'page_versions'
|
// release lock on self.pages
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(need_base_image_lsn)
|
Ok(need_base_image_lsn)
|
||||||
@@ -184,7 +187,7 @@ impl InMemoryLayer {
|
|||||||
start_lsn,
|
start_lsn,
|
||||||
inner: Mutex::new(InMemoryLayerInner {
|
inner: Mutex::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
page_versions: BTreeMap::new(),
|
pages: BTreeMap::new(),
|
||||||
segsizes: BTreeMap::new(),
|
segsizes: BTreeMap::new(),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
@@ -230,15 +233,11 @@ impl InMemoryLayer {
|
|||||||
);
|
);
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
let page_history = inner
|
||||||
|
.pages
|
||||||
if old.is_some() {
|
.entry(blknum)
|
||||||
// We already had an entry for this LSN. That's odd..
|
.or_insert_with(PageHistory::default);
|
||||||
warn!(
|
page_history.push(lsn, pv);
|
||||||
"Page version of rel {} blk {} at {} already exists",
|
|
||||||
self.seg.rel, blknum, lsn
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also update the relation size, if this extended the relation.
|
// Also update the relation size, if this extended the relation.
|
||||||
if self.seg.rel.is_blocky() {
|
if self.seg.rel.is_blocky() {
|
||||||
@@ -311,7 +310,7 @@ impl InMemoryLayer {
|
|||||||
timelineid,
|
timelineid,
|
||||||
lsn
|
lsn
|
||||||
);
|
);
|
||||||
let mut page_versions = BTreeMap::new();
|
let mut pages = BTreeMap::new();
|
||||||
let mut segsizes = BTreeMap::new();
|
let mut segsizes = BTreeMap::new();
|
||||||
|
|
||||||
let seg = src.get_seg_tag();
|
let seg = src.get_seg_tag();
|
||||||
@@ -333,7 +332,8 @@ impl InMemoryLayer {
|
|||||||
page_image: Some(img),
|
page_image: Some(img),
|
||||||
record: None,
|
record: None,
|
||||||
};
|
};
|
||||||
page_versions.insert((blknum, lsn), pv);
|
let page_history = PageHistory::from_image(lsn, pv);
|
||||||
|
pages.insert(blknum, page_history);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(InMemoryLayer {
|
Ok(InMemoryLayer {
|
||||||
@@ -344,8 +344,8 @@ impl InMemoryLayer {
|
|||||||
start_lsn: lsn,
|
start_lsn: lsn,
|
||||||
inner: Mutex::new(InMemoryLayerInner {
|
inner: Mutex::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
page_versions: page_versions,
|
pages,
|
||||||
segsizes: segsizes,
|
segsizes,
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -388,10 +388,11 @@ impl InMemoryLayer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
|
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
|
||||||
let mut before_page_versions;
|
let mut before_pages = BTreeMap::new();
|
||||||
let mut before_segsizes;
|
let mut before_segsizes;
|
||||||
let mut after_page_versions;
|
|
||||||
let mut after_segsizes;
|
let mut after_segsizes;
|
||||||
|
let mut after_pages = BTreeMap::new();
|
||||||
|
|
||||||
if !dropped {
|
if !dropped {
|
||||||
before_segsizes = BTreeMap::new();
|
before_segsizes = BTreeMap::new();
|
||||||
after_segsizes = BTreeMap::new();
|
after_segsizes = BTreeMap::new();
|
||||||
@@ -403,20 +404,16 @@ impl InMemoryLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
before_page_versions = BTreeMap::new();
|
for (blknum, page_history) in inner.pages.iter() {
|
||||||
after_page_versions = BTreeMap::new();
|
let (old, new) = page_history.clone().split_at(end_lsn);
|
||||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
before_pages.insert(*blknum, old);
|
||||||
if *lsn > end_lsn {
|
after_pages.insert(*blknum, new);
|
||||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
|
||||||
} else {
|
|
||||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
before_page_versions = inner.page_versions.clone();
|
before_pages = inner.pages.clone();
|
||||||
before_segsizes = inner.segsizes.clone();
|
before_segsizes = inner.segsizes.clone();
|
||||||
after_segsizes = BTreeMap::new();
|
after_segsizes = BTreeMap::new();
|
||||||
after_page_versions = BTreeMap::new();
|
after_pages = BTreeMap::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can release the lock now.
|
// we can release the lock now.
|
||||||
@@ -431,13 +428,13 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
end_lsn,
|
end_lsn,
|
||||||
dropped,
|
dropped,
|
||||||
before_page_versions,
|
before_pages,
|
||||||
before_segsizes,
|
before_segsizes,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||||
// them
|
// them
|
||||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() {
|
||||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||||
|
|
||||||
let new_open = Self::copy_snapshot(
|
let new_open = Self::copy_snapshot(
|
||||||
@@ -449,7 +446,7 @@ impl InMemoryLayer {
|
|||||||
end_lsn,
|
end_lsn,
|
||||||
)?;
|
)?;
|
||||||
let mut new_inner = new_open.inner.lock().unwrap();
|
let mut new_inner = new_open.inner.lock().unwrap();
|
||||||
new_inner.page_versions.append(&mut after_page_versions);
|
new_inner.pages.append(&mut after_pages);
|
||||||
new_inner.segsizes.append(&mut after_segsizes);
|
new_inner.segsizes.append(&mut after_segsizes);
|
||||||
drop(new_inner);
|
drop(new_inner);
|
||||||
|
|
||||||
@@ -476,14 +473,16 @@ impl InMemoryLayer {
|
|||||||
for (k, v) in inner.segsizes.iter() {
|
for (k, v) in inner.segsizes.iter() {
|
||||||
result += &format!("{}: {}\n", k, v);
|
result += &format!("{}: {}\n", k, v);
|
||||||
}
|
}
|
||||||
for (k, v) in inner.page_versions.iter() {
|
for (page_num, page_history) in inner.pages.iter() {
|
||||||
result += &format!(
|
for (lsn, image) in page_history.iter() {
|
||||||
"blk {} at {}: {}/{}\n",
|
result += &format!(
|
||||||
k.0,
|
"blk {} at {}: {}/{}\n",
|
||||||
k.1,
|
page_num,
|
||||||
v.page_image.is_some(),
|
lsn,
|
||||||
v.record.is_some()
|
image.page_image.is_some(),
|
||||||
);
|
image.record.is_some()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
|
|||||||
94
pageserver/src/layered_repository/page_history.rs
Normal file
94
pageserver/src/layered_repository/page_history.rs
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
use super::storage_layer::PageVersion;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
|
/// A data structure that holds one or more versions of a particular page number.
|
||||||
|
//
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct PageHistory {
|
||||||
|
/// Pages stored in order, from oldest to newest.
|
||||||
|
pages: VecDeque<(Lsn, PageVersion)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PageHistory {
|
||||||
|
/// Create a new PageHistory containing a single image.
|
||||||
|
pub fn from_image(lsn: Lsn, image: PageVersion) -> Self {
|
||||||
|
let mut pages = VecDeque::new();
|
||||||
|
pages.push_back((lsn, image));
|
||||||
|
PageHistory { pages }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a newer page image.
|
||||||
|
pub fn push(&mut self, lsn: Lsn, page: PageVersion) {
|
||||||
|
if let Some((back_lsn, _)) = self.pages.back() {
|
||||||
|
debug_assert_ne!(
|
||||||
|
back_lsn, &lsn,
|
||||||
|
"push page at lsn {:?} but one already exists",
|
||||||
|
lsn
|
||||||
|
);
|
||||||
|
debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn");
|
||||||
|
}
|
||||||
|
self.pages.push_back((lsn, page));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn latest(&self) -> Option<(Lsn, &PageVersion)> {
|
||||||
|
self.pages.back().map(|(lsn, page)| (*lsn, page))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Split a page history at a particular LSN.
|
||||||
|
///
|
||||||
|
/// This consumes this PageHistory and returns two new ones.
|
||||||
|
/// Any changes exactly matching the split LSN will be in the
|
||||||
|
/// "old" history.
|
||||||
|
//
|
||||||
|
// FIXME: Is this necessary? There is some debate whether "splitting"
|
||||||
|
// layers is the best design.
|
||||||
|
//
|
||||||
|
pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) {
|
||||||
|
let mut old = PageHistory::default();
|
||||||
|
let mut new = PageHistory::default();
|
||||||
|
for (lsn, page) in self.pages {
|
||||||
|
if lsn > split_lsn {
|
||||||
|
new.push(lsn, page)
|
||||||
|
} else {
|
||||||
|
old.push(lsn, page);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(old, new)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter(&self) -> impl Iterator<Item = &(Lsn, PageVersion)> {
|
||||||
|
self.pages.iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn page_history() {
|
||||||
|
fn make_page(b: u8) -> PageVersion {
|
||||||
|
let image = vec![b; 8192].into();
|
||||||
|
PageVersion {
|
||||||
|
page_image: Some(image),
|
||||||
|
record: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut ph = PageHistory::default();
|
||||||
|
ph.push(10.into(), make_page(1));
|
||||||
|
ph.push(20.into(), make_page(2));
|
||||||
|
ph.push(30.into(), make_page(3));
|
||||||
|
|
||||||
|
let (latest_lsn, latest_image) = ph.latest().unwrap();
|
||||||
|
assert_eq!(latest_lsn, 30.into());
|
||||||
|
assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3));
|
||||||
|
|
||||||
|
let mut it = ph.iter();
|
||||||
|
assert_eq!(it.next().unwrap().0, 10.into());
|
||||||
|
assert_eq!(it.next().unwrap().0, 20.into());
|
||||||
|
assert_eq!(it.next().unwrap().0, 30.into());
|
||||||
|
assert!(it.next().is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -37,6 +37,7 @@
|
|||||||
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
||||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||||
//!
|
//!
|
||||||
|
use crate::layered_repository::page_history::PageHistory;
|
||||||
use crate::layered_repository::storage_layer::{
|
use crate::layered_repository::storage_layer::{
|
||||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||||
};
|
};
|
||||||
@@ -236,7 +237,7 @@ pub struct SnapshotLayerInner {
|
|||||||
|
|
||||||
/// All versions of all pages in the file are are kept here.
|
/// All versions of all pages in the file are are kept here.
|
||||||
/// Indexed by block number and LSN.
|
/// Indexed by block number and LSN.
|
||||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
pages: BTreeMap<u32, PageHistory>,
|
||||||
|
|
||||||
/// `relsizes` tracks the size of the relation at different points in time.
|
/// `relsizes` tracks the size of the relation at different points in time.
|
||||||
relsizes: BTreeMap<Lsn, u32>,
|
relsizes: BTreeMap<Lsn, u32>,
|
||||||
@@ -270,6 +271,7 @@ impl Layer for SnapshotLayer {
|
|||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
reconstruct_data: &mut PageReconstructData,
|
reconstruct_data: &mut PageReconstructData,
|
||||||
) -> Result<Option<Lsn>> {
|
) -> Result<Option<Lsn>> {
|
||||||
|
/*
|
||||||
// Scan the BTreeMap backwards, starting from the given entry.
|
// Scan the BTreeMap backwards, starting from the given entry.
|
||||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||||
{
|
{
|
||||||
@@ -303,6 +305,9 @@ impl Layer for SnapshotLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Ok(need_base_image_lsn)
|
Ok(need_base_image_lsn)
|
||||||
|
|
||||||
|
*/
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get size of the relation at given LSN
|
/// Get size of the relation at given LSN
|
||||||
@@ -380,7 +385,7 @@ impl SnapshotLayer {
|
|||||||
start_lsn: Lsn,
|
start_lsn: Lsn,
|
||||||
end_lsn: Lsn,
|
end_lsn: Lsn,
|
||||||
dropped: bool,
|
dropped: bool,
|
||||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
pages: BTreeMap<u32, PageHistory>,
|
||||||
relsizes: BTreeMap<Lsn, u32>,
|
relsizes: BTreeMap<Lsn, u32>,
|
||||||
) -> Result<SnapshotLayer> {
|
) -> Result<SnapshotLayer> {
|
||||||
let snapfile = SnapshotLayer {
|
let snapfile = SnapshotLayer {
|
||||||
@@ -393,10 +398,12 @@ impl SnapshotLayer {
|
|||||||
dropped,
|
dropped,
|
||||||
inner: Mutex::new(SnapshotLayerInner {
|
inner: Mutex::new(SnapshotLayerInner {
|
||||||
loaded: true,
|
loaded: true,
|
||||||
page_versions: page_versions,
|
pages,
|
||||||
relsizes: relsizes,
|
relsizes,
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
let inner = snapfile.inner.lock().unwrap();
|
let inner = snapfile.inner.lock().unwrap();
|
||||||
|
|
||||||
// Write the in-memory btreemaps into a file
|
// Write the in-memory btreemaps into a file
|
||||||
@@ -426,12 +433,16 @@ impl SnapshotLayer {
|
|||||||
drop(inner);
|
drop(inner);
|
||||||
|
|
||||||
Ok(snapfile)
|
Ok(snapfile)
|
||||||
|
*/
|
||||||
|
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Load the contents of the file into memory
|
/// Load the contents of the file into memory
|
||||||
///
|
///
|
||||||
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
|
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
|
||||||
|
/*
|
||||||
// quick exit if already loaded
|
// quick exit if already loaded
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
@@ -469,6 +480,9 @@ impl SnapshotLayer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(inner)
|
Ok(inner)
|
||||||
|
*/
|
||||||
|
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create SnapshotLayers representing all files on disk
|
/// Create SnapshotLayers representing all files on disk
|
||||||
@@ -479,6 +493,7 @@ impl SnapshotLayer {
|
|||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
||||||
|
/*
|
||||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||||
|
|
||||||
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||||
@@ -506,6 +521,8 @@ impl SnapshotLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(snapfiles);
|
return Ok(snapfiles);
|
||||||
|
*/
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete(&self) -> Result<()> {
|
pub fn delete(&self) -> Result<()> {
|
||||||
@@ -519,11 +536,14 @@ impl SnapshotLayer {
|
|||||||
/// it will need to be loaded back.
|
/// it will need to be loaded back.
|
||||||
///
|
///
|
||||||
pub fn unload(&self) -> Result<()> {
|
pub fn unload(&self) -> Result<()> {
|
||||||
|
/*
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
inner.page_versions = BTreeMap::new();
|
inner.page_versions = BTreeMap::new();
|
||||||
inner.relsizes = BTreeMap::new();
|
inner.relsizes = BTreeMap::new();
|
||||||
inner.loaded = false;
|
inner.loaded = false;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
*/
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// debugging function to print out the contents of the layer
|
/// debugging function to print out the contents of the layer
|
||||||
|
|||||||
21
snapfile/Cargo.toml
Normal file
21
snapfile/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
[package]
|
||||||
|
name = "snapfile"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "snaptool"
|
||||||
|
path = "snaptool/main.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0"
|
||||||
|
aversion = "0.2"
|
||||||
|
bookfile = "0.3"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
rand = "0.8.3"
|
||||||
|
structopt = "0.3"
|
||||||
|
zenith_utils = { path = "../zenith_utils" }
|
||||||
|
hex = "0.4.3"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = "3.2"
|
||||||
64
snapfile/snaptool/main.rs
Normal file
64
snapfile/snaptool/main.rs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use snapfile::{squash, SnapFile};
|
||||||
|
use std::env::current_dir;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use structopt::StructOpt;
|
||||||
|
|
||||||
|
#[derive(StructOpt)]
|
||||||
|
#[structopt(about = "A tool for manipulating snapshot files")]
|
||||||
|
enum Params {
|
||||||
|
Squash(Squash),
|
||||||
|
Describe(Describe),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(StructOpt)]
|
||||||
|
struct Squash {
|
||||||
|
older: PathBuf,
|
||||||
|
newer: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(StructOpt)]
|
||||||
|
struct Describe {
|
||||||
|
file: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_errors(error: anyhow::Error) {
|
||||||
|
let formatted: Vec<_> = error.chain().map(ToString::to_string).collect();
|
||||||
|
eprintln!("{}", formatted.join(": "));
|
||||||
|
}
|
||||||
|
fn main() {
|
||||||
|
let res = snaptool_main();
|
||||||
|
if let Err(e) = res {
|
||||||
|
print_errors(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snaptool_main() -> Result<()> {
|
||||||
|
let params = Params::from_args();
|
||||||
|
match ¶ms {
|
||||||
|
Params::Squash(squash_params) => {
|
||||||
|
let out_dir = current_dir()?;
|
||||||
|
squash(&squash_params.older, &squash_params.newer, &out_dir).with_context(|| {
|
||||||
|
format!(
|
||||||
|
"squash {} {}",
|
||||||
|
squash_params.older.to_string_lossy(),
|
||||||
|
squash_params.newer.to_string_lossy()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Params::Describe(describe_params) => {
|
||||||
|
describe(describe_params)
|
||||||
|
.with_context(|| format!("describe {}", describe_params.file.to_string_lossy()))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe(params: &Describe) -> Result<()> {
|
||||||
|
let mut snap = SnapFile::new(¶ms.file)?;
|
||||||
|
let meta = snap.read_meta()?;
|
||||||
|
|
||||||
|
println!("{:?}: {:#?}", params.file, meta);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
339
snapfile/src/lib.rs
Normal file
339
snapfile/src/lib.rs
Normal file
@@ -0,0 +1,339 @@
|
|||||||
|
//! A file format for storage a snapshot of pages.
|
||||||
|
|
||||||
|
#![warn(missing_docs)]
|
||||||
|
#![forbid(unsafe_code)]
|
||||||
|
#![warn(clippy::cast_possible_truncation)]
|
||||||
|
|
||||||
|
mod page;
|
||||||
|
mod squash;
|
||||||
|
mod versioned;
|
||||||
|
|
||||||
|
#[doc(inline)]
|
||||||
|
pub use page::Page;
|
||||||
|
|
||||||
|
#[doc(inline)]
|
||||||
|
pub use squash::squash;
|
||||||
|
|
||||||
|
use anyhow::{bail, Context, Result};
|
||||||
|
use aversion::group::{DataSink, DataSourceExt};
|
||||||
|
use aversion::util::cbor::CborData;
|
||||||
|
use bookfile::{Book, BookWriter, ChapterWriter};
|
||||||
|
use std::ffi::OsString;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::ops::AddAssign;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
pub use versioned::{PageIndex, PageLocation, Predecessor, SnapFileMeta};
|
||||||
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
|
impl SnapFileMeta {
|
||||||
|
pub fn new(previous: Option<SnapFileMeta>, timeline: [u8; 16], lsn: Lsn) -> Self {
|
||||||
|
// Store the metadata of the predecessor snapshot, if there is one.
|
||||||
|
let predecessor = previous.map(|prev| Predecessor {
|
||||||
|
timeline: prev.timeline,
|
||||||
|
lsn: prev.lsn,
|
||||||
|
});
|
||||||
|
|
||||||
|
SnapFileMeta {
|
||||||
|
timeline,
|
||||||
|
predecessor,
|
||||||
|
lsn: lsn.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_filename(&self) -> OsString {
|
||||||
|
let timeline_string = hex::encode(self.timeline);
|
||||||
|
let pred_lsn = match &self.predecessor {
|
||||||
|
None => 0,
|
||||||
|
Some(pred) => pred.lsn,
|
||||||
|
};
|
||||||
|
format!("{}_{:x}_{:x}.zdb", timeline_string, pred_lsn, self.lsn).into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PageIndex {
|
||||||
|
/// Retrieve the page offset from the index.
|
||||||
|
///
|
||||||
|
/// If the page is not in the index, returns `None`.
|
||||||
|
fn get_page_location(&self, page_num: u64) -> Option<PageLocation> {
|
||||||
|
self.map.get(&page_num).copied()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn page_count(&self) -> usize {
|
||||||
|
self.map.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PageLocation {
|
||||||
|
fn to_offset(&self) -> u64 {
|
||||||
|
// Counts in units of one page.
|
||||||
|
self.0 * 8192
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AddAssign<u64> for PageLocation {
|
||||||
|
fn add_assign(&mut self, rhs: u64) {
|
||||||
|
self.0 += rhs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A read-only snapshot file.
|
||||||
|
pub struct SnapFile {
|
||||||
|
book: Book<File>,
|
||||||
|
page_index: PageIndex,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SnapFile {
|
||||||
|
/// Open a new `SnapFile` for reading.
|
||||||
|
///
|
||||||
|
/// This call will validate some of the file's format and read the file's
|
||||||
|
/// metadata; it may return an error if the file format is invalid.
|
||||||
|
pub fn new(path: &Path) -> Result<Self> {
|
||||||
|
let file =
|
||||||
|
File::open(path).with_context(|| format!("snapfile {}", path.to_string_lossy()))?;
|
||||||
|
let book = Book::new(file)?;
|
||||||
|
if book.magic() != versioned::SNAPFILE_MAGIC {
|
||||||
|
bail!("bad magic number");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the page index into memory.
|
||||||
|
let chapter_reader = book
|
||||||
|
.chapter_reader(versioned::CHAPTER_PAGE_INDEX)
|
||||||
|
.context("snapfile missing index chapter")?;
|
||||||
|
let mut source = CborData::new(chapter_reader);
|
||||||
|
let page_index: PageIndex = source.expect_message()?;
|
||||||
|
Ok(SnapFile { book, page_index })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the snapshot metadata.
|
||||||
|
pub fn read_meta(&mut self) -> Result<SnapFileMeta> {
|
||||||
|
let chapter_reader = self
|
||||||
|
.book
|
||||||
|
.chapter_reader(versioned::CHAPTER_SNAP_META)
|
||||||
|
.context("snapfile missing meta")?;
|
||||||
|
let mut source = CborData::new(chapter_reader);
|
||||||
|
let meta: SnapFileMeta = source.expect_message()?;
|
||||||
|
Ok(meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the number of pages stored in this snapshot.
|
||||||
|
pub fn page_count(&self) -> usize {
|
||||||
|
self.page_index.page_count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a page exists in this snapshot's index.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the given page is stored in this snapshot file,
|
||||||
|
/// `false` if not.
|
||||||
|
pub fn has_page(&self, page_num: u64) -> bool {
|
||||||
|
self.page_index.get_page_location(page_num).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a page.
|
||||||
|
///
|
||||||
|
/// If this returns Ok(None), that means that this file does not store
|
||||||
|
/// the requested page.
|
||||||
|
/// This should only fail (returning `Err`) if an IO error occurs.
|
||||||
|
pub fn read_page(&self, page_num: u64) -> Result<Option<Page>> {
|
||||||
|
match self.page_index.get_page_location(page_num) {
|
||||||
|
None => Ok(None),
|
||||||
|
Some(page_offset) => Ok(Some(self._read_page(page_offset)?)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read page data from the file.
|
||||||
|
///
|
||||||
|
/// This does the work for read_page and PageIter.
|
||||||
|
fn _read_page(&self, page_location: PageLocation) -> Result<Page> {
|
||||||
|
// Compute the true byte offset in the file.
|
||||||
|
let page_offset = page_location.to_offset();
|
||||||
|
let chapter_reader = self
|
||||||
|
.book
|
||||||
|
.chapter_reader(versioned::CHAPTER_PAGES)
|
||||||
|
.context("snapfile missing pages chapter")?;
|
||||||
|
|
||||||
|
let mut page_data = Page::default();
|
||||||
|
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
|
||||||
|
if bytes_read != 8192 {
|
||||||
|
bail!("read truncated page");
|
||||||
|
}
|
||||||
|
Ok(page_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterate over pages.
|
||||||
|
///
|
||||||
|
/// This will return an iterator over (usize, )
|
||||||
|
pub fn all_pages(&self) -> PageIter {
|
||||||
|
let inner = (&self.page_index.map).into_iter();
|
||||||
|
PageIter {
|
||||||
|
snapfile: self,
|
||||||
|
inner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An iterator over all pages in the snapshot file.
|
||||||
|
pub struct PageIter<'a> {
|
||||||
|
snapfile: &'a SnapFile,
|
||||||
|
inner: std::collections::btree_map::Iter<'a, u64, PageLocation>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for PageIter<'_> {
|
||||||
|
type Item = Result<(u64, Page)>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let (page_num, page_offset) = self.inner.next()?;
|
||||||
|
let result = self
|
||||||
|
.snapfile
|
||||||
|
._read_page(*page_offset)
|
||||||
|
.map(|page_data| (*page_num, page_data));
|
||||||
|
Some(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `SnapWriter` creates a new snapshot file.
|
||||||
|
///
|
||||||
|
/// A SnapWriter is created, has pages written into it, and is then closed.
|
||||||
|
pub struct SnapWriter {
|
||||||
|
writer: ChapterWriter<File>,
|
||||||
|
page_index: PageIndex,
|
||||||
|
meta: SnapFileMeta,
|
||||||
|
current_offset: PageLocation,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SnapWriter {
|
||||||
|
/// Create a new `SnapWriter`.
|
||||||
|
///
|
||||||
|
pub fn new(dir: &Path, meta: SnapFileMeta) -> Result<Self> {
|
||||||
|
let mut path = PathBuf::from(dir);
|
||||||
|
path.push(meta.to_filename());
|
||||||
|
let file = File::create(path)?;
|
||||||
|
let book = BookWriter::new(file, versioned::SNAPFILE_MAGIC)?;
|
||||||
|
|
||||||
|
// Write a chapter for the snapshot metadata.
|
||||||
|
let writer = book.new_chapter(versioned::CHAPTER_SNAP_META);
|
||||||
|
let mut sink = CborData::new(writer);
|
||||||
|
sink.write_message(&meta)?;
|
||||||
|
let book = sink.into_inner().close()?;
|
||||||
|
|
||||||
|
// Open a new chapter for raw page data.
|
||||||
|
let writer = book.new_chapter(versioned::CHAPTER_PAGES);
|
||||||
|
Ok(SnapWriter {
|
||||||
|
writer,
|
||||||
|
page_index: PageIndex::default(),
|
||||||
|
meta,
|
||||||
|
current_offset: PageLocation::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write a page into the snap file.
|
||||||
|
pub fn write_page<P>(&mut self, page_num: u64, page_data: P) -> Result<()>
|
||||||
|
where
|
||||||
|
P: Into<Page>,
|
||||||
|
{
|
||||||
|
let page_data: Page = page_data.into();
|
||||||
|
self.writer.write_all(page_data.as_ref())?;
|
||||||
|
let prev = self.page_index.map.insert(page_num, self.current_offset);
|
||||||
|
if prev.is_some() {
|
||||||
|
panic!("duplicate index for page {}", page_num);
|
||||||
|
}
|
||||||
|
self.current_offset += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finish writing pages.
|
||||||
|
///
|
||||||
|
/// This consumes the PagesWriter and completes the snapshot.
|
||||||
|
//
|
||||||
|
pub fn finish(self) -> Result<SnapFileMeta> {
|
||||||
|
let book = self.writer.close()?;
|
||||||
|
|
||||||
|
// Write out a page index and close the book. This will write out any
|
||||||
|
// necessary file metadata.
|
||||||
|
// FIXME: these 3 lines could be combined into a single function
|
||||||
|
// that means "serialize this data structure with this format into this chapter".
|
||||||
|
let writer = book.new_chapter(versioned::CHAPTER_PAGE_INDEX);
|
||||||
|
let mut sink = CborData::new(writer);
|
||||||
|
sink.write_message(&self.page_index)?;
|
||||||
|
|
||||||
|
// Close the chapter, then close the book.
|
||||||
|
sink.into_inner().close()?.close()?;
|
||||||
|
|
||||||
|
// Return the snapshot metadata to the caller.
|
||||||
|
Ok(self.meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
const TEST_TIMELINE: [u8; 16] = [99u8; 16];
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn snap_two_pages() {
|
||||||
|
// When `dir` goes out of scope the directory will be unlinked.
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let snap_meta = {
|
||||||
|
// Write out a new snapshot file with two pages.
|
||||||
|
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
|
||||||
|
let mut snap = SnapWriter::new(dir.path(), meta).unwrap();
|
||||||
|
// Write the pages out of order, because why not?
|
||||||
|
let page99 = [99u8; 8192];
|
||||||
|
snap.write_page(99, page99).unwrap();
|
||||||
|
let page33 = [33u8; 8192];
|
||||||
|
snap.write_page(33, page33).unwrap();
|
||||||
|
snap.finish().unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(snap_meta.lsn, 1234);
|
||||||
|
|
||||||
|
{
|
||||||
|
// Read the snapshot file and verify the contents.
|
||||||
|
let mut path = PathBuf::from(dir.path());
|
||||||
|
path.push(snap_meta.to_filename());
|
||||||
|
let mut snap = SnapFile::new(&path).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(snap.page_count(), 2);
|
||||||
|
assert!(!snap.has_page(0));
|
||||||
|
assert!(snap.has_page(33));
|
||||||
|
assert!(!snap.has_page(98));
|
||||||
|
assert!(snap.has_page(99));
|
||||||
|
assert!(snap.read_page(0).unwrap().is_none());
|
||||||
|
let page = snap.read_page(33).unwrap().unwrap();
|
||||||
|
assert_eq!(*page.0, [33u8; 8192]);
|
||||||
|
let page = snap.read_page(99).unwrap().unwrap();
|
||||||
|
assert_eq!(*page.0, [99u8; 8192]);
|
||||||
|
|
||||||
|
// Make sure the deserialized metadata matches what we think we wrote.
|
||||||
|
let meta2 = snap.read_meta().unwrap();
|
||||||
|
assert_eq!(snap_meta, meta2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn snap_zero_pages() {
|
||||||
|
// When `dir` goes out of scope the directory will be unlinked.
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let snap_meta = {
|
||||||
|
// Write out a new snapshot file with no pages.
|
||||||
|
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
|
||||||
|
let snap = SnapWriter::new(dir.path(), meta).unwrap();
|
||||||
|
snap.finish().unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
// Read the snapshot file.
|
||||||
|
let mut path = PathBuf::from(dir.path());
|
||||||
|
path.push(snap_meta.to_filename());
|
||||||
|
let snap = SnapFile::new(&path).unwrap();
|
||||||
|
assert_eq!(snap.page_index.page_count(), 0);
|
||||||
|
assert!(!snap.has_page(0));
|
||||||
|
assert!(!snap.has_page(99));
|
||||||
|
assert!(snap.read_page(0).unwrap().is_none());
|
||||||
|
assert!(snap.read_page(99).unwrap().is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
42
snapfile/src/page.rs
Normal file
42
snapfile/src/page.rs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
/// A single 8KB page.
|
||||||
|
pub struct Page(pub Box<[u8; 8192]>);
|
||||||
|
|
||||||
|
impl Page {
|
||||||
|
/// Create a page by copying bytes from another slice.
|
||||||
|
///
|
||||||
|
/// This is a copy, not a move. If the caller already has
|
||||||
|
/// an owned array then `From<[u8; 8192]>` can be used instead.
|
||||||
|
pub fn copy_slice(x: &[u8; 8192]) -> Self {
|
||||||
|
Page(Box::new(x.clone()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Page {
|
||||||
|
fn default() -> Self {
|
||||||
|
Page(Box::new([0u8; 8192]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<[u8; 8192]> for Page {
|
||||||
|
fn from(array: [u8; 8192]) -> Self {
|
||||||
|
Page(Box::new(array))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Box<[u8; 8192]>> for Page {
|
||||||
|
fn from(heap_array: Box<[u8; 8192]>) -> Self {
|
||||||
|
Page(heap_array)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8; 8192]> for Page {
|
||||||
|
fn as_ref(&self) -> &[u8; 8192] {
|
||||||
|
self.0.as_ref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsMut<[u8; 8192]> for Page {
|
||||||
|
fn as_mut(&mut self) -> &mut [u8; 8192] {
|
||||||
|
self.0.as_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
100
snapfile/src/squash.rs
Normal file
100
snapfile/src/squash.rs
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
use crate::{Page, PageIter, SnapFile, SnapFileMeta, SnapWriter};
|
||||||
|
use anyhow::{bail, Result};
|
||||||
|
use std::cmp::Ordering;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
// A helper struct that holds an iterator, along with the last
|
||||||
|
// value taken from the iterator.
|
||||||
|
struct PageStepper<'a> {
|
||||||
|
it: PageIter<'a>,
|
||||||
|
pub cache: Option<(u64, Page)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> PageStepper<'a> {
|
||||||
|
fn new(snapfile: &'a SnapFile) -> Result<Self> {
|
||||||
|
let mut it = snapfile.all_pages();
|
||||||
|
let cache = it.next().transpose()?;
|
||||||
|
Ok(PageStepper { it, cache })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a new page from the iterator, returning the previous page.
|
||||||
|
fn step(&mut self) -> Result<Option<(u64, Page)>> {
|
||||||
|
let mut next = self.it.next().transpose()?;
|
||||||
|
std::mem::swap(&mut self.cache, &mut next);
|
||||||
|
Ok(next)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Squash two snapshot files into one.
|
||||||
|
///
|
||||||
|
/// The resulting snapshot will contain all of the pages from both files.
|
||||||
|
/// If the same page number is stored in both, it will keep the page from
|
||||||
|
/// the newer snapshot.
|
||||||
|
///
|
||||||
|
/// The name of the resulting file will be automatically generated from
|
||||||
|
/// the snapshot metadata.
|
||||||
|
pub fn squash(older: &Path, newer: &Path, out_dir: &Path) -> Result<()> {
|
||||||
|
let mut snap1 = SnapFile::new(older)?;
|
||||||
|
let mut snap2 = SnapFile::new(newer)?;
|
||||||
|
|
||||||
|
let meta1 = snap1.read_meta()?;
|
||||||
|
let meta2 = snap2.read_meta()?;
|
||||||
|
|
||||||
|
// Check that snap1 is the predecessor of snap2.
|
||||||
|
match meta2.predecessor {
|
||||||
|
Some(pred) if pred.timeline == meta1.timeline => {}
|
||||||
|
_ => {
|
||||||
|
bail!(
|
||||||
|
"snap file {:?} is not the predecessor of {:?}",
|
||||||
|
&older,
|
||||||
|
&newer,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The new combined snapshot will have most fields from meta2 (the later
|
||||||
|
// snapshot), but will have the predecessor from meta1.
|
||||||
|
let new_meta = SnapFileMeta {
|
||||||
|
// There is some danger in squashing snapshots across two timelines,
|
||||||
|
// in that it's possible to get confused about what the history
|
||||||
|
// looks like. Ultimately, it should be possible to squash our way
|
||||||
|
// to a "complete" snapshot (that contains all pages), so this must
|
||||||
|
// be possible.
|
||||||
|
timeline: meta2.timeline,
|
||||||
|
predecessor: meta1.predecessor,
|
||||||
|
lsn: meta2.lsn,
|
||||||
|
};
|
||||||
|
let mut snap_writer = SnapWriter::new(&out_dir, new_meta)?;
|
||||||
|
|
||||||
|
let mut iter1 = PageStepper::new(&snap1)?;
|
||||||
|
let mut iter2 = PageStepper::new(&snap2)?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let next_page = match (&iter1.cache, &iter2.cache) {
|
||||||
|
(None, None) => break,
|
||||||
|
(Some(_), None) => iter1.step()?,
|
||||||
|
(None, Some(_)) => iter2.step()?,
|
||||||
|
(Some(x), Some(y)) => {
|
||||||
|
// If these are two different page numbers, then advance the iterator
|
||||||
|
// with the numerically lower number.
|
||||||
|
// If they are the same page number, then store the one from the newer
|
||||||
|
// snapshot, and discard the other (advancing both iterators).
|
||||||
|
match x.0.cmp(&y.0) {
|
||||||
|
Ordering::Less => iter1.step()?,
|
||||||
|
Ordering::Greater => iter2.step()?,
|
||||||
|
Ordering::Equal => {
|
||||||
|
let _ = iter1.step()?;
|
||||||
|
iter2.step()?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// This can't be None, because we would already checked inside the match
|
||||||
|
// statement.
|
||||||
|
let (page_num, page_data) = next_page.unwrap();
|
||||||
|
snap_writer.write_page(page_num, page_data)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
snap_writer.finish()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
88
snapfile/src/versioned.rs
Normal file
88
snapfile/src/versioned.rs
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
//! Versioned data structures for snapshot files
|
||||||
|
//!
|
||||||
|
//! To ensure that future versions of software can read snapshot files,
|
||||||
|
//! all data structures that are serialized into the snapshot files should
|
||||||
|
//! live in this module.
|
||||||
|
//!
|
||||||
|
//! Once released, versioned data structures should never be modified.
|
||||||
|
//! Instead, new versions should be created and conversion functions should
|
||||||
|
//! be defined using the `FromVersion` trait.
|
||||||
|
|
||||||
|
use aversion::{assign_message_ids, UpgradeLatest, Versioned};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
// A random constant, to identify this file type.
|
||||||
|
pub(crate) const SNAPFILE_MAGIC: u32 = 0x7fb8_38a8;
|
||||||
|
|
||||||
|
// Constant chapter numbers
|
||||||
|
// FIXME: the bookfile crate should use something better to index, e.g. strings.
|
||||||
|
/// Snapshot-specific file metadata
|
||||||
|
pub(crate) const CHAPTER_SNAP_META: u64 = 1;
|
||||||
|
/// A packed set of 8KB pages.
|
||||||
|
pub(crate) const CHAPTER_PAGES: u64 = 2;
|
||||||
|
/// An index of pages.
|
||||||
|
pub(crate) const CHAPTER_PAGE_INDEX: u64 = 3;
|
||||||
|
|
||||||
|
/// Information about the predecessor snapshot.
|
||||||
|
///
|
||||||
|
/// It contains the snap_id of the predecessor snapshot, and the LSN
|
||||||
|
/// of that snapshot.
|
||||||
|
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct Predecessor {
|
||||||
|
/// This is the ID number of the predecessor timeline.
|
||||||
|
///
|
||||||
|
/// This may match the current snapshot's timeline id, but
|
||||||
|
/// it may not (if the precessor was the branch point).
|
||||||
|
pub timeline: [u8; 16],
|
||||||
|
|
||||||
|
/// This is the LSN of the predecessor snapshot.
|
||||||
|
pub lsn: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Serialize, Deserialize, Versioned, UpgradeLatest)]
|
||||||
|
pub struct SnapFileMetaV1 {
|
||||||
|
/// This is a unique ID number for this timeline.
|
||||||
|
///
|
||||||
|
/// This number guarantees that snapshot history is unique.
|
||||||
|
pub timeline: [u8; 16],
|
||||||
|
|
||||||
|
/// Information about the predecessor snapshot.
|
||||||
|
///
|
||||||
|
/// If `None`, this snapshot is the start of a new database.
|
||||||
|
pub predecessor: Option<Predecessor>,
|
||||||
|
|
||||||
|
/// This is the last LSN stored in this snapshot.
|
||||||
|
pub lsn: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A type alias for the latest version of `SnapFileMeta`.
|
||||||
|
pub type SnapFileMeta = SnapFileMetaV1;
|
||||||
|
|
||||||
|
/// A page location within a file.
|
||||||
|
///
|
||||||
|
/// Note: this is an opaque value that may not be the true byte offset;
|
||||||
|
/// it may be relative to some other location or measured in units other
|
||||||
|
/// than bytes.
|
||||||
|
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct PageLocationV1(pub(crate) u64);
|
||||||
|
|
||||||
|
/// A type alias for the latest version of `PageLocation`.
|
||||||
|
pub type PageLocation = PageLocationV1;
|
||||||
|
|
||||||
|
/// An index from page number to offset within the pages chapter.
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize, Versioned, UpgradeLatest)]
|
||||||
|
pub struct PageIndexV1 {
|
||||||
|
/// A map from page number to file offset.
|
||||||
|
pub(crate) map: BTreeMap<u64, PageLocationV1>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A type alias for the latest version of `PageIndex`.
|
||||||
|
pub type PageIndex = PageIndexV1;
|
||||||
|
|
||||||
|
// Each message gets a unique message id, for tracking by the aversion traits.
|
||||||
|
assign_message_ids! {
|
||||||
|
PageIndex: 100,
|
||||||
|
SnapFileMeta: 101,
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user