Add import/export functions from buffered stotage to files with layeres

This commit is contained in:
Konstantin Knizhnik
2021-11-10 09:48:28 +03:00
parent 9947de4a2a
commit 3b471494ff
12 changed files with 393 additions and 141 deletions

4
Cargo.lock generated
View File

@@ -2571,7 +2571,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9e0d4aaa804d3b7acae0ee0e3c28999dd590a99391f784e18cd037ed9191655"
dependencies = [
"anyhow",
"crc32c",

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.1.8"
yakv = "0.1.9"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -33,7 +33,10 @@ use std::time::{Duration, Instant};
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
use crate::repository::{
GcResult, PageReconstructData, PageReconstructResult, PageVersion, Repository, Timeline,
TimelineWriter, WALRecord,
};
use crate::tenant_mgr;
use crate::toast_store::ToastStore;
use crate::walreceiver;
@@ -42,12 +45,18 @@ use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use crate::layered_repository::delta_layer::DeltaLayer;
use crate::layered_repository::filename;
use crate::layered_repository::image_layer::ImageLayer;
use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::crashsafe_dir;
use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use zenith_utils::seqwait::SeqWait;
use zenith_utils::vec_map::VecMap;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
@@ -166,18 +175,6 @@ struct RelishStore {
meta: Option<HashMap<RelishTag, MetadataSnapshot>>,
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
struct PageReconstructData {
records: Vec<(Lsn, WALRecord)>,
page_img: Option<Bytes>,
}
/// Public interface
impl Repository for BufferedRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
@@ -787,8 +784,8 @@ impl Timeline for BufferedTimeline {
if let StoreKey::Data(dk) = key {
let ver = PageVersion::des(&pair.1)?;
match ver {
PageVersion::Image(img) => Ok(img), // already materialized: we are done
PageVersion::Delta(rec) => {
PageVersion::Page(img) => Ok(img), // already materialized: we are done
PageVersion::Wal(rec) => {
let mut will_init = rec.will_init;
let mut data = PageReconstructData {
records: Vec::new(),
@@ -806,11 +803,11 @@ impl Timeline for BufferedTimeline {
if let StoreKey::Data(dk) = key {
assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image
match ver {
PageVersion::Image(img) => {
PageVersion::Page(img) => {
data.page_img = Some(img);
break;
}
PageVersion::Delta(rec) => {
PageVersion::Wal(rec) => {
will_init = rec.will_init;
data.records.push((dk.lsn, rec));
}
@@ -926,6 +923,191 @@ impl Timeline for BufferedTimeline {
.observe_closure_duration(|| self.checkpoint_internal(0, true))
}
///
/// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The
/// start is inclusive, and end is exclusive.
///
fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()> {
let now = Instant::now();
let _enter = info_span!("export timeline", timeline = %self.timelineid, tenant = %self.tenantid, start_lsn = %start_lsn, end_lsn = %end_lsn).entered();
info!("exporting timeline");
let zero_rel = RelishTag::Relation(ZERO_TAG);
let mut from_rel = zero_rel;
let mut from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: start_lsn,
});
let mut relsizes: HashMap<RelishTag, VecMap<Lsn, u32>> = HashMap::new();
let mut dropped: HashSet<RelishTag> = HashSet::new();
let store = self.store.read().unwrap();
'meta: loop {
let mut iter = store.data.range(&from.ser()?..);
while let Some(entry) = iter.next() {
let pair = entry?;
if let StoreKey::Metadata(dk) = StoreKey::des(&pair.0)? {
// processing metadata
from_rel = dk.rel;
if dk.lsn < start_lsn {
from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: start_lsn,
});
continue 'meta;
} else if dk.lsn >= end_lsn {
from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn::MAX,
});
continue 'meta;
} else {
let meta = MetadataValue::des(&pair.1)?;
if let Some(size) = meta.size {
if let Some(sizes) = relsizes.get_mut(&dk.rel) {
sizes.append(dk.lsn, size).unwrap();
} else {
let mut sizes = VecMap::default();
sizes.append(dk.lsn, size).unwrap();
relsizes.insert(dk.rel, sizes);
}
} else {
dropped.insert(dk.rel);
}
}
} else {
// End of metadata
break 'meta;
}
}
break;
}
from_rel = zero_rel;
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: 0,
lsn: Lsn(0),
});
// currently proceed block number
let mut from_blknum = 0;
let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
'pages: loop {
let mut iter = store.data.range(&from.ser()?..);
while let Some(entry) = iter.next() {
let pair = entry?;
if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? {
let same_seg = from_rel == dk.rel
&& dk.blknum / RELISH_SEG_SIZE < from_blknum / RELISH_SEG_SIZE;
if !same_seg && from_rel != zero_rel {
let is_dropped = dropped.contains(&from_rel);
let segtag = SegmentTag::from_blknum(from_rel, from_blknum);
if !page_versions.is_empty() {
DeltaLayer::create(
self.conf,
self.timelineid,
self.tenantid,
segtag,
start_lsn,
end_lsn,
is_dropped,
page_versions.iter().map(|t| (t.0, t.1, &t.2)),
relsizes[&from_rel].clone(),
)?;
page_versions.clear();
}
if !is_dropped {
let mut images: Vec<Bytes> =
Vec::with_capacity(RELISH_SEG_SIZE as usize);
let first_blknum = from_blknum & !RELISH_SEG_SIZE;
let last_blknum = u32::min(
first_blknum + RELISH_SEG_SIZE,
relsizes[&from_rel].last().map(|p| p.1).unwrap_or(0),
);
if first_blknum < last_blknum {
for blk in first_blknum..last_blknum {
images.push(self.get_page_at_lsn(from_rel, blk, end_lsn)?);
}
ImageLayer::create(
self.conf,
self.timelineid,
self.tenantid,
segtag,
end_lsn,
images,
)?;
}
}
}
from_rel = dk.rel;
from_blknum = dk.blknum;
if dk.lsn < start_lsn {
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: from_blknum,
lsn: start_lsn,
});
} else if dk.lsn >= start_lsn {
from_blknum += 1;
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: from_blknum,
lsn: start_lsn,
});
} else {
page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?));
continue;
}
continue 'pages;
} else {
break 'pages;
}
}
break;
}
if from_rel != zero_rel {
let is_dropped = dropped.contains(&from_rel);
let segtag = SegmentTag::from_blknum(from_rel, from_blknum);
if !page_versions.is_empty() {
DeltaLayer::create(
self.conf,
self.timelineid,
self.tenantid,
segtag,
start_lsn,
end_lsn,
is_dropped,
page_versions.iter().map(|t| (t.0, t.1, &t.2)),
relsizes[&from_rel].clone(),
)?;
}
if !is_dropped {
let mut images: Vec<Bytes> = Vec::with_capacity(RELISH_SEG_SIZE as usize);
let first_blknum = from_blknum & !RELISH_SEG_SIZE;
let last_blknum = u32::min(
first_blknum + RELISH_SEG_SIZE,
relsizes[&from_rel].last().map(|p| p.1).unwrap_or(0),
);
if first_blknum < last_blknum {
for blk in first_blknum..last_blknum {
images.push(self.get_page_at_lsn(from_rel, blk, end_lsn)?);
}
ImageLayer::create(
self.conf,
self.timelineid,
self.tenantid,
segtag,
end_lsn,
images,
)?;
}
}
}
info!("Export time line in {:?}", now.elapsed());
Ok(())
}
fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
@@ -1213,7 +1395,7 @@ impl BufferedTimeline {
debug_assert!(key < till);
if let StoreKey::Data(dk) = StoreKey::des(&key)? {
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Delta(rec) = ver {
if let PageVersion::Wal(rec) = ver {
// ignore already materialized pages
let mut will_init = rec.will_init;
let mut data = PageReconstructData {
@@ -1233,11 +1415,11 @@ impl BufferedTimeline {
if let StoreKey::Data(dk2) = key {
assert!(dk.rel == dk2.rel); // check that we don't jump to previous relish before locating full image
match ver {
PageVersion::Image(img) => {
PageVersion::Page(img) => {
data.page_img = Some(img);
break;
}
PageVersion::Delta(rec) => {
PageVersion::Wal(rec) => {
will_init = rec.will_init;
history_len += rec.rec.len();
data.records.push((dk2.lsn, rec));
@@ -1260,7 +1442,7 @@ impl BufferedTimeline {
});
let mut store = self.store.write().unwrap();
store.data.put(&key, &PageVersion::Image(img?).ser()?)?;
store.data.put(&key, &PageVersion::Page(img?).ser()?)?;
n_checkpointed_records += 1;
}
}
@@ -1339,7 +1521,8 @@ impl BufferedTimeline {
info!("GC starting");
let mut from_rel = RelishTag::Relation(ZERO_TAG);
let zero_rel = RelishTag::Relation(ZERO_TAG);
let mut from_rel = zero_rel;
let mut from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn(0),
@@ -1414,7 +1597,7 @@ impl BufferedTimeline {
break;
}
from_rel = RelishTag::Relation(ZERO_TAG);
from_rel = zero_rel;
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: 0,
@@ -1454,7 +1637,7 @@ impl BufferedTimeline {
// .. and have something to remove
// ... and have page image
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Image(_) = ver {
if let PageVersion::Page(_) = ver {
// ... then remove all previously accumulated deltas and images, as them are not needed any more
drop(store);
let mut store = self.store.write().unwrap();
@@ -1578,14 +1761,6 @@ impl Deref for BufferedTimelineWriter<'_> {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PageVersion {
/// an 8kb page image
Image(Bytes),
/// WAL record to get from previous page version to this one.
Delta(WALRecord),
}
impl<'a> BufferedTimelineWriter<'a> {
fn put_page_version(
&self,
@@ -1645,12 +1820,63 @@ impl<'a> BufferedTimelineWriter<'a> {
}
impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
///
/// Import data from layer files
///
fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()> {
let now = Instant::now();
let (imgfilenames, deltafilenames) =
filename::list_files(self.tl.conf, self.tl.timelineid, self.tl.tenantid)?;
let mut data = PageReconstructData {
records: Vec::new(),
page_img: None,
};
for filename in &imgfilenames {
if filename.lsn == snapshot_lsn {
let layer =
ImageLayer::new(self.tl.conf, self.tl.timelineid, self.tl.tenantid, filename);
let seg_size = layer.get_seg_size(snapshot_lsn)?;
for blknum in 0..seg_size {
match layer.get_page_reconstruct_data(blknum, snapshot_lsn, &mut data)? {
PageReconstructResult::Complete => {
if let Some(page) = data.page_img.take() {
self.put_page_image(
filename.seg.rel,
filename.seg.segno * RELISH_SEG_SIZE + blknum,
snapshot_lsn,
page,
)?;
}
}
PageReconstructResult::Continue(_) => bail!("Branches not supported"),
PageReconstructResult::Missing(_) => bail!("Failed to extract page image"),
}
}
}
}
for filename in &deltafilenames {
ensure!(filename.start_lsn < filename.end_lsn);
if filename.start_lsn >= snapshot_lsn {
let layer =
DeltaLayer::new(self.tl.conf, self.tl.timelineid, self.tl.tenantid, filename);
for (blk, lsn, ver) in layer.versions()? {
self.put_page_version(filename.seg.rel, blk, lsn, ver)?;
}
}
}
info!("Import timeline completed in {:?}", now.elapsed());
Ok(())
}
fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
self.put_page_version(rel, blknum, lsn, PageVersion::Delta(rec))
self.put_page_version(rel, blknum, lsn, PageVersion::Wal(rec))
}
fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
self.put_page_version(rel, blknum, lsn, PageVersion::Image(img))
self.put_page_version(rel, blknum, lsn, PageVersion::Page(img))
}
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> Result<()> {

View File

@@ -0,0 +1,5 @@
mod blob;
pub mod delta_layer;
pub mod filename;
pub mod image_layer;
pub mod storage_layer;

View File

@@ -39,9 +39,8 @@
//!
use crate::layered_repository::blob::BlobWriter;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::repository::{PageReconstructData, PageReconstructResult, PageVersion};
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -148,6 +147,10 @@ pub struct DeltaLayerInner {
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -201,22 +204,22 @@ impl Layer for DeltaLayer {
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
} else if let Some(rec) = pv.record {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
match pv {
PageVersion::Page(img) => {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
}
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
PageVersion::Wal(rec) => {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
@@ -226,7 +229,7 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(PageReconstructResult::Continue(self.start_lsn))
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
} else {
Ok(PageReconstructResult::Complete)
}
@@ -307,19 +310,22 @@ impl Layer for DeltaLayer {
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
if let Some(img) = pv.page_image.as_ref() {
write!(&mut desc, " img {} bytes", img.len())?;
}
if let Some(rec) = pv.record.as_ref() {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
PageVersion::Wal(rec) => {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
}
}
println!(" blk {} at {}: {}", blk, lsn, desc);
}
@@ -328,6 +334,19 @@ impl Layer for DeltaLayer {
}
impl DeltaLayer {
/// debugging function to print out the contents of the layer
pub fn versions(&self) -> Result<Vec<(u32, Lsn, PageVersion)>> {
let mut versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
let inner = self.load()?;
let (_path, book) = self.open_book()?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
let buf = read_blob(&chapter, blob_range)?;
versions.push((*blk, *lsn, PageVersion::des(&buf)?));
}
Ok(versions)
}
fn path_for(
path_or_conf: &PathOrConf,
timelineid: ZTimelineId,
@@ -442,12 +461,7 @@ impl DeltaLayer {
}
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
let path = Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
);
let path = self.path();
let file = File::open(&path)?;
let book = Book::new(file)?;

View File

@@ -13,8 +13,6 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
@@ -292,7 +290,7 @@ pub fn list_files(
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
imgfiles.push(imgfilename);
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -22,11 +22,8 @@
//! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
//!
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{anyhow, bail, ensure, Result};
@@ -117,6 +114,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -250,7 +251,7 @@ impl ImageLayer {
}
/// Create a new image file, using the given array of pages.
fn create(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -325,6 +326,7 @@ impl ImageLayer {
Ok(layer)
}
/*
// Create a new image file by materializing every page in a source layer
// at given LSN.
pub fn create_from_src(
@@ -362,6 +364,7 @@ impl ImageLayer {
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
}
*/
///
/// Load the contents of the file into memory

View File

@@ -3,10 +3,9 @@
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -45,56 +44,6 @@ impl SegmentTag {
}
}
///
/// Represents a version of a page at a specific LSN. The LSN is the key of the
/// entry in the 'page_versions' hash, it is not duplicated here.
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
///
/// It's also possible to have both a WAL record and a page image in the same
/// PageVersion. That happens if page version is originally stored as a WAL record
/// but it is later reconstructed by a GetPage@LSN request by performing WAL
/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to
/// the WAL record in that case. TODO: That's pretty accidental, not the result
/// of any grand design. If we want to keep reconstructed page versions around, we
/// probably should have a separate buffer cache so that we could control the
/// replacement policy globally. Or if we keep a reconstructed page image, we
/// could throw away the WAL record.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageVersion {
/// an 8kb page image
pub page_image: Option<Bytes>,
/// WAL record to get from previous page version to this one.
pub record: Option<WALRecord>,
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs.
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
@@ -104,6 +53,8 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -11,6 +11,7 @@ pub mod basebackup;
pub mod branches;
pub mod buffered_repository;
pub mod http;
pub mod layered_repository;
pub mod page_service;
pub mod relish;
pub mod relish_storage;
@@ -33,10 +34,10 @@ pub mod defaults {
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_HORIZON: u64 = 1600_000_000u64;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_HORIZON: u64 = 2000_000_000u64;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;

View File

@@ -99,6 +99,12 @@ pub trait Timeline: Send + Sync {
/// Get a list of all existing non-relational objects
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
///
/// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The
/// start is inclusive, and end is exclusive.
///
fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()>;
/// Get the LSN where this branch was created
fn get_ancestor_lsn(&self) -> Lsn;
@@ -166,6 +172,11 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()>;
///
/// Import data from layer files
///
fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -198,6 +209,39 @@ impl WALRecord {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PageVersion {
/// an 8kb page image
Page(Bytes),
/// WAL record to get from previous page version to this one.
Wal(WALRecord),
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// Tests that should work the same with any Repository/Timeline implementation.
///

View File

@@ -4,12 +4,13 @@ use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use tracing::*;
use yakv::storage::{Key, Storage, StorageIterator, Value};
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2;
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 4;
const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb
///
/// Toast storage consistof two KV databases: one for storing main index
@@ -125,9 +126,12 @@ impl ToastStore {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
None, //Some(&path.join("pageserver.log")),
CACHE_SIZE,
CHECKPOINT_INTERVAL,
Some(&path.join("pageserver.log")),
StorageConfig {
cache_size: CACHE_SIZE,
checkpoint_interval: CHECKPOINT_INTERVAL,
wal_flush_threshold: WAL_FLUSH_THRESHOLD,
},
)?,
committed: false,
})

View File

@@ -87,6 +87,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(None)
}
pub fn last(&self) -> Option<&(K, V)> {
self.0.last()
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).