diff --git a/Cargo.lock b/Cargo.lock index af15d5776b..0b84391e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,7 +2571,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.1.8" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e0d4aaa804d3b7acae0ee0e3c28999dd590a99391f784e18cd037ed9191655" dependencies = [ "anyhow", "crc32c", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ad092976f1..9ff186c81a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.1.8" +yakv = "0.1.9" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 68db3f5869..46cb7ae805 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -33,7 +33,10 @@ use std::time::{Duration, Instant}; use crate::relish::*; use crate::relish_storage::schedule_timeline_upload; -use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord}; +use crate::repository::{ + GcResult, PageReconstructData, PageReconstructResult, PageVersion, Repository, Timeline, + TimelineWriter, WALRecord, +}; use crate::tenant_mgr; use crate::toast_store::ToastStore; use crate::walreceiver; @@ -42,12 +45,18 @@ use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; +use crate::layered_repository::delta_layer::DeltaLayer; +use crate::layered_repository::filename; +use crate::layered_repository::image_layer::ImageLayer; +use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE}; + use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::bin_ser::BeSer; use zenith_utils::crashsafe_dir; use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn}; use zenith_utils::seqwait::SeqWait; +use zenith_utils::vec_map::VecMap; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); @@ -166,18 +175,6 @@ struct RelishStore { meta: Option>, } -/// -/// Data needed to reconstruct a page version -/// -/// 'page_img' is the old base image of the page to start the WAL replay with. -/// It can be None, if the first WAL record initializes the page (will_init) -/// 'records' contains the records to apply over the base image. -/// -struct PageReconstructData { - records: Vec<(Lsn, WALRecord)>, - page_img: Option, -} - /// Public interface impl Repository for BufferedRepository { fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { @@ -787,8 +784,8 @@ impl Timeline for BufferedTimeline { if let StoreKey::Data(dk) = key { let ver = PageVersion::des(&pair.1)?; match ver { - PageVersion::Image(img) => Ok(img), // already materialized: we are done - PageVersion::Delta(rec) => { + PageVersion::Page(img) => Ok(img), // already materialized: we are done + PageVersion::Wal(rec) => { let mut will_init = rec.will_init; let mut data = PageReconstructData { records: Vec::new(), @@ -806,11 +803,11 @@ impl Timeline for BufferedTimeline { if let StoreKey::Data(dk) = key { assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image match ver { - PageVersion::Image(img) => { + PageVersion::Page(img) => { data.page_img = Some(img); break; } - PageVersion::Delta(rec) => { + PageVersion::Wal(rec) => { will_init = rec.will_init; data.records.push((dk.lsn, rec)); } @@ -926,6 +923,191 @@ impl Timeline for BufferedTimeline { .observe_closure_duration(|| self.checkpoint_internal(0, true)) } + /// + /// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The + /// start is inclusive, and end is exclusive. + /// + fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()> { + let now = Instant::now(); + let _enter = info_span!("export timeline", timeline = %self.timelineid, tenant = %self.tenantid, start_lsn = %start_lsn, end_lsn = %end_lsn).entered(); + + info!("exporting timeline"); + let zero_rel = RelishTag::Relation(ZERO_TAG); + let mut from_rel = zero_rel; + let mut from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: start_lsn, + }); + let mut relsizes: HashMap> = HashMap::new(); + let mut dropped: HashSet = HashSet::new(); + let store = self.store.read().unwrap(); + 'meta: loop { + let mut iter = store.data.range(&from.ser()?..); + + while let Some(entry) = iter.next() { + let pair = entry?; + if let StoreKey::Metadata(dk) = StoreKey::des(&pair.0)? { + // processing metadata + from_rel = dk.rel; + if dk.lsn < start_lsn { + from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: start_lsn, + }); + continue 'meta; + } else if dk.lsn >= end_lsn { + from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: Lsn::MAX, + }); + continue 'meta; + } else { + let meta = MetadataValue::des(&pair.1)?; + if let Some(size) = meta.size { + if let Some(sizes) = relsizes.get_mut(&dk.rel) { + sizes.append(dk.lsn, size).unwrap(); + } else { + let mut sizes = VecMap::default(); + sizes.append(dk.lsn, size).unwrap(); + relsizes.insert(dk.rel, sizes); + } + } else { + dropped.insert(dk.rel); + } + } + } else { + // End of metadata + break 'meta; + } + } + break; + } + + from_rel = zero_rel; + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: 0, + lsn: Lsn(0), + }); + + // currently proceed block number + let mut from_blknum = 0; + let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new(); + 'pages: loop { + let mut iter = store.data.range(&from.ser()?..); + while let Some(entry) = iter.next() { + let pair = entry?; + if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? { + let same_seg = from_rel == dk.rel + && dk.blknum / RELISH_SEG_SIZE < from_blknum / RELISH_SEG_SIZE; + if !same_seg && from_rel != zero_rel { + let is_dropped = dropped.contains(&from_rel); + let segtag = SegmentTag::from_blknum(from_rel, from_blknum); + if !page_versions.is_empty() { + DeltaLayer::create( + self.conf, + self.timelineid, + self.tenantid, + segtag, + start_lsn, + end_lsn, + is_dropped, + page_versions.iter().map(|t| (t.0, t.1, &t.2)), + relsizes[&from_rel].clone(), + )?; + page_versions.clear(); + } + if !is_dropped { + let mut images: Vec = + Vec::with_capacity(RELISH_SEG_SIZE as usize); + let first_blknum = from_blknum & !RELISH_SEG_SIZE; + let last_blknum = u32::min( + first_blknum + RELISH_SEG_SIZE, + relsizes[&from_rel].last().map(|p| p.1).unwrap_or(0), + ); + if first_blknum < last_blknum { + for blk in first_blknum..last_blknum { + images.push(self.get_page_at_lsn(from_rel, blk, end_lsn)?); + } + ImageLayer::create( + self.conf, + self.timelineid, + self.tenantid, + segtag, + end_lsn, + images, + )?; + } + } + } + from_rel = dk.rel; + from_blknum = dk.blknum; + if dk.lsn < start_lsn { + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: from_blknum, + lsn: start_lsn, + }); + } else if dk.lsn >= start_lsn { + from_blknum += 1; + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: from_blknum, + lsn: start_lsn, + }); + } else { + page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?)); + continue; + } + continue 'pages; + } else { + break 'pages; + } + } + break; + } + if from_rel != zero_rel { + let is_dropped = dropped.contains(&from_rel); + let segtag = SegmentTag::from_blknum(from_rel, from_blknum); + if !page_versions.is_empty() { + DeltaLayer::create( + self.conf, + self.timelineid, + self.tenantid, + segtag, + start_lsn, + end_lsn, + is_dropped, + page_versions.iter().map(|t| (t.0, t.1, &t.2)), + relsizes[&from_rel].clone(), + )?; + } + if !is_dropped { + let mut images: Vec = Vec::with_capacity(RELISH_SEG_SIZE as usize); + let first_blknum = from_blknum & !RELISH_SEG_SIZE; + let last_blknum = u32::min( + first_blknum + RELISH_SEG_SIZE, + relsizes[&from_rel].last().map(|p| p.1).unwrap_or(0), + ); + if first_blknum < last_blknum { + for blk in first_blknum..last_blknum { + images.push(self.get_page_at_lsn(from_rel, blk, end_lsn)?); + } + ImageLayer::create( + self.conf, + self.timelineid, + self.tenantid, + segtag, + end_lsn, + images, + )?; + } + } + } + info!("Export time line in {:?}", now.elapsed()); + Ok(()) + } + fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last } @@ -1213,7 +1395,7 @@ impl BufferedTimeline { debug_assert!(key < till); if let StoreKey::Data(dk) = StoreKey::des(&key)? { let ver = PageVersion::des(&pair.1)?; - if let PageVersion::Delta(rec) = ver { + if let PageVersion::Wal(rec) = ver { // ignore already materialized pages let mut will_init = rec.will_init; let mut data = PageReconstructData { @@ -1233,11 +1415,11 @@ impl BufferedTimeline { if let StoreKey::Data(dk2) = key { assert!(dk.rel == dk2.rel); // check that we don't jump to previous relish before locating full image match ver { - PageVersion::Image(img) => { + PageVersion::Page(img) => { data.page_img = Some(img); break; } - PageVersion::Delta(rec) => { + PageVersion::Wal(rec) => { will_init = rec.will_init; history_len += rec.rec.len(); data.records.push((dk2.lsn, rec)); @@ -1260,7 +1442,7 @@ impl BufferedTimeline { }); let mut store = self.store.write().unwrap(); - store.data.put(&key, &PageVersion::Image(img?).ser()?)?; + store.data.put(&key, &PageVersion::Page(img?).ser()?)?; n_checkpointed_records += 1; } } @@ -1339,7 +1521,8 @@ impl BufferedTimeline { info!("GC starting"); - let mut from_rel = RelishTag::Relation(ZERO_TAG); + let zero_rel = RelishTag::Relation(ZERO_TAG); + let mut from_rel = zero_rel; let mut from = StoreKey::Metadata(MetadataKey { rel: from_rel, lsn: Lsn(0), @@ -1414,7 +1597,7 @@ impl BufferedTimeline { break; } - from_rel = RelishTag::Relation(ZERO_TAG); + from_rel = zero_rel; from = StoreKey::Data(DataKey { rel: from_rel, blknum: 0, @@ -1454,7 +1637,7 @@ impl BufferedTimeline { // .. and have something to remove // ... and have page image let ver = PageVersion::des(&pair.1)?; - if let PageVersion::Image(_) = ver { + if let PageVersion::Page(_) = ver { // ... then remove all previously accumulated deltas and images, as them are not needed any more drop(store); let mut store = self.store.write().unwrap(); @@ -1578,14 +1761,6 @@ impl Deref for BufferedTimelineWriter<'_> { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum PageVersion { - /// an 8kb page image - Image(Bytes), - /// WAL record to get from previous page version to this one. - Delta(WALRecord), -} - impl<'a> BufferedTimelineWriter<'a> { fn put_page_version( &self, @@ -1645,12 +1820,63 @@ impl<'a> BufferedTimelineWriter<'a> { } impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { + /// + /// Import data from layer files + /// + fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()> { + let now = Instant::now(); + let (imgfilenames, deltafilenames) = + filename::list_files(self.tl.conf, self.tl.timelineid, self.tl.tenantid)?; + + let mut data = PageReconstructData { + records: Vec::new(), + page_img: None, + }; + + for filename in &imgfilenames { + if filename.lsn == snapshot_lsn { + let layer = + ImageLayer::new(self.tl.conf, self.tl.timelineid, self.tl.tenantid, filename); + let seg_size = layer.get_seg_size(snapshot_lsn)?; + for blknum in 0..seg_size { + match layer.get_page_reconstruct_data(blknum, snapshot_lsn, &mut data)? { + PageReconstructResult::Complete => { + if let Some(page) = data.page_img.take() { + self.put_page_image( + filename.seg.rel, + filename.seg.segno * RELISH_SEG_SIZE + blknum, + snapshot_lsn, + page, + )?; + } + } + PageReconstructResult::Continue(_) => bail!("Branches not supported"), + PageReconstructResult::Missing(_) => bail!("Failed to extract page image"), + } + } + } + } + + for filename in &deltafilenames { + ensure!(filename.start_lsn < filename.end_lsn); + if filename.start_lsn >= snapshot_lsn { + let layer = + DeltaLayer::new(self.tl.conf, self.tl.timelineid, self.tl.tenantid, filename); + for (blk, lsn, ver) in layer.versions()? { + self.put_page_version(filename.seg.rel, blk, lsn, ver)?; + } + } + } + info!("Import timeline completed in {:?}", now.elapsed()); + Ok(()) + } + fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { - self.put_page_version(rel, blknum, lsn, PageVersion::Delta(rec)) + self.put_page_version(rel, blknum, lsn, PageVersion::Wal(rec)) } fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> { - self.put_page_version(rel, blknum, lsn, PageVersion::Image(img)) + self.put_page_version(rel, blknum, lsn, PageVersion::Page(img)) } fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> Result<()> { diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs new file mode 100644 index 0000000000..bdaebf5718 --- /dev/null +++ b/pageserver/src/layered_repository.rs @@ -0,0 +1,5 @@ +mod blob; +pub mod delta_layer; +pub mod filename; +pub mod image_layer; +pub mod storage_layer; diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 24ed9d6e69..e4a0c2fccf 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -39,9 +39,8 @@ //! use crate::layered_repository::blob::BlobWriter; use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; -use crate::layered_repository::storage_layer::{ - Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, -}; +use crate::layered_repository::storage_layer::{Layer, SegmentTag}; +use crate::repository::{PageReconstructData, PageReconstructResult, PageVersion}; use crate::waldecoder; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; @@ -148,6 +147,10 @@ pub struct DeltaLayerInner { } impl Layer for DeltaLayer { + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } @@ -201,22 +204,22 @@ impl Layer for DeltaLayer { for ((_blknum, pv_lsn), blob_range) in iter { let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?; - if let Some(img) = pv.page_image { - // Found a page image, return it - reconstruct_data.page_img = Some(img); - need_image = false; - break; - } else if let Some(rec) = pv.record { - let will_init = rec.will_init; - reconstruct_data.records.push((*pv_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back + match pv { + PageVersion::Page(img) => { + // Found a page image, return it + reconstruct_data.page_img = Some(img); need_image = false; break; } - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); + PageVersion::Wal(rec) => { + let will_init = rec.will_init; + reconstruct_data.records.push((*pv_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } + } } } @@ -226,7 +229,7 @@ impl Layer for DeltaLayer { // If an older page image is needed to reconstruct the page, let the // caller know. if need_image { - Ok(PageReconstructResult::Continue(self.start_lsn)) + Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1))) } else { Ok(PageReconstructResult::Complete) } @@ -307,19 +310,22 @@ impl Layer for DeltaLayer { let buf = read_blob(&chapter, blob_range)?; let pv = PageVersion::des(&buf)?; - if let Some(img) = pv.page_image.as_ref() { - write!(&mut desc, " img {} bytes", img.len())?; - } - if let Some(rec) = pv.record.as_ref() { - let wal_desc = waldecoder::describe_wal_record(&rec.rec); - write!( - &mut desc, - " rec {} bytes will_init: {} {}", - rec.rec.len(), - rec.will_init, - wal_desc - )?; + match pv { + PageVersion::Page(img) => { + write!(&mut desc, " img {} bytes", img.len())?; + } + PageVersion::Wal(rec) => { + let wal_desc = waldecoder::describe_wal_record(&rec.rec); + write!( + &mut desc, + " rec {} bytes will_init: {} {}", + rec.rec.len(), + rec.will_init, + wal_desc + )?; + } } + println!(" blk {} at {}: {}", blk, lsn, desc); } @@ -328,6 +334,19 @@ impl Layer for DeltaLayer { } impl DeltaLayer { + /// debugging function to print out the contents of the layer + pub fn versions(&self) -> Result> { + let mut versions: Vec<(u32, Lsn, PageVersion)> = Vec::new(); + let inner = self.load()?; + let (_path, book) = self.open_book()?; + let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; + for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() { + let buf = read_blob(&chapter, blob_range)?; + versions.push((*blk, *lsn, PageVersion::des(&buf)?)); + } + Ok(versions) + } + fn path_for( path_or_conf: &PathOrConf, timelineid: ZTimelineId, @@ -442,12 +461,7 @@ impl DeltaLayer { } fn open_book(&self) -> Result<(PathBuf, Book)> { - let path = Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &self.layer_name(), - ); + let path = self.path(); let file = File::open(&path)?; let book = Book::new(file)?; diff --git a/pageserver/src/layered_repository/filename.rs b/pageserver/src/layered_repository/filename.rs index afa106f939..8000d47112 100644 --- a/pageserver/src/layered_repository/filename.rs +++ b/pageserver/src/layered_repository/filename.rs @@ -13,8 +13,6 @@ use anyhow::Result; use log::*; use zenith_utils::lsn::Lsn; -use super::METADATA_FILE_NAME; - // Note: LayeredTimeline::load_layer_map() relies on this sort order #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct DeltaFileName { @@ -292,7 +290,7 @@ pub fn list_files( deltafiles.push(deltafilename); } else if let Some(imgfilename) = ImageFileName::parse_str(fname) { imgfiles.push(imgfilename); - } else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") { + } else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") { // ignore these } else { warn!("unrecognized filename in timeline dir: {}", fname); diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 744f793558..0af37b1c00 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -22,11 +22,8 @@ //! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER. //! use crate::layered_repository::filename::{ImageFileName, PathOrConf}; -use crate::layered_repository::storage_layer::{ - Layer, PageReconstructData, PageReconstructResult, SegmentTag, -}; -use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::RELISH_SEG_SIZE; +use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE}; +use crate::repository::{PageReconstructData, PageReconstructResult}; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{anyhow, bail, ensure, Result}; @@ -117,6 +114,10 @@ impl Layer for ImageLayer { PathBuf::from(self.layer_name().to_string()) } + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } @@ -250,7 +251,7 @@ impl ImageLayer { } /// Create a new image file, using the given array of pages. - fn create( + pub fn create( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, @@ -325,6 +326,7 @@ impl ImageLayer { Ok(layer) } + /* // Create a new image file by materializing every page in a source layer // at given LSN. pub fn create_from_src( @@ -362,6 +364,7 @@ impl ImageLayer { Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images) } + */ /// /// Load the contents of the file into memory diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 0a86fe407d..7f1a80731f 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -3,10 +3,9 @@ //! use crate::relish::RelishTag; -use crate::repository::WALRecord; -use crate::ZTimelineId; +use crate::repository::{PageReconstructData, PageReconstructResult}; +use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; -use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::fmt; use std::path::PathBuf; @@ -45,56 +44,6 @@ impl SegmentTag { } } -/// -/// Represents a version of a page at a specific LSN. The LSN is the key of the -/// entry in the 'page_versions' hash, it is not duplicated here. -/// -/// A page version can be stored as a full page image, or as WAL record that needs -/// to be applied over the previous page version to reconstruct this version. -/// -/// It's also possible to have both a WAL record and a page image in the same -/// PageVersion. That happens if page version is originally stored as a WAL record -/// but it is later reconstructed by a GetPage@LSN request by performing WAL -/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to -/// the WAL record in that case. TODO: That's pretty accidental, not the result -/// of any grand design. If we want to keep reconstructed page versions around, we -/// probably should have a separate buffer cache so that we could control the -/// replacement policy globally. Or if we keep a reconstructed page image, we -/// could throw away the WAL record. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PageVersion { - /// an 8kb page image - pub page_image: Option, - /// WAL record to get from previous page version to this one. - pub record: Option, -} - -/// -/// Data needed to reconstruct a page version -/// -/// 'page_img' is the old base image of the page to start the WAL replay with. -/// It can be None, if the first WAL record initializes the page (will_init) -/// 'records' contains the records to apply over the base image. -/// -pub struct PageReconstructData { - pub records: Vec<(Lsn, WALRecord)>, - pub page_img: Option, -} - -/// Return value from Layer::get_page_reconstruct_data -pub enum PageReconstructResult { - /// Got all the data needed to reconstruct the requested page - Complete, - /// This layer didn't contain all the required data, the caller should look up - /// the predecessor layer at the returned LSN and collect more data from there. - Continue(Lsn), - /// This layer didn't contain data needed to reconstruct the page version at - /// the returned LSN. This is usually considered an error, but might be OK - /// in some circumstances. - Missing(Lsn), -} - /// /// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs. /// There are two kinds of layers, in-memory and on-disk layers. In-memory @@ -104,6 +53,8 @@ pub enum PageReconstructResult { /// in-memory and on-disk layers. /// pub trait Layer: Send + Sync { + fn get_tenant_id(&self) -> ZTenantId; + /// Identify the timeline this relish belongs to fn get_timeline_id(&self) -> ZTimelineId; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index f2bfd6c48b..d9c939f1c0 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -11,6 +11,7 @@ pub mod basebackup; pub mod branches; pub mod buffered_repository; pub mod http; +pub mod layered_repository; pub mod page_service; pub mod relish; pub mod relish_storage; @@ -33,10 +34,10 @@ pub mod defaults { // Minimal size of WAL records chain to trigger materialization of the page pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); - pub const DEFAULT_GC_HORIZON: u64 = 1600_000_000u64; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); + pub const DEFAULT_GC_HORIZON: u64 = 2000_000_000u64; + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 4016c900c8..c9b6a8f3f4 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -99,6 +99,12 @@ pub trait Timeline: Send + Sync { /// Get a list of all existing non-relational objects fn list_nonrels(&self, lsn: Lsn) -> Result>; + /// + /// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The + /// start is inclusive, and end is exclusive. + /// + fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()>; + /// Get the LSN where this branch was created fn get_ancestor_lsn(&self) -> Lsn; @@ -166,6 +172,11 @@ pub trait TimelineWriter: Deref { /// Complete all delayed commits and advance disk_consistent_lsn /// fn checkpoint(&self) -> Result<()>; + + /// + /// Import data from layer files + /// + fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()>; } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -198,6 +209,39 @@ impl WALRecord { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PageVersion { + /// an 8kb page image + Page(Bytes), + /// WAL record to get from previous page version to this one. + Wal(WALRecord), +} + +/// +/// Data needed to reconstruct a page version +/// +/// 'page_img' is the old base image of the page to start the WAL replay with. +/// It can be None, if the first WAL record initializes the page (will_init) +/// 'records' contains the records to apply over the base image. +/// +pub struct PageReconstructData { + pub records: Vec<(Lsn, WALRecord)>, + pub page_img: Option, +} + +/// Return value from Layer::get_page_reconstruct_data +pub enum PageReconstructResult { + /// Got all the data needed to reconstruct the requested page + Complete, + /// This layer didn't contain all the required data, the caller should look up + /// the predecessor layer at the returned LSN and collect more data from there. + Continue(Lsn), + /// This layer didn't contain data needed to reconstruct the page version at + /// the returned LSN. This is usually considered an error, but might be OK + /// in some circumstances. + Missing(Lsn), +} + /// /// Tests that should work the same with any Repository/Timeline implementation. /// diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index 2dda5e7750..f9afe117f7 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -4,12 +4,13 @@ use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; use tracing::*; -use yakv::storage::{Key, Storage, StorageIterator, Value}; +use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb -const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2; +const COMMIT_THRESHOLD: usize = CACHE_SIZE / 4; +const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb /// /// Toast storage consistof two KV databases: one for storing main index @@ -125,9 +126,12 @@ impl ToastStore { Ok(ToastStore { db: Storage::open( &path.join("pageserver.db"), - None, //Some(&path.join("pageserver.log")), - CACHE_SIZE, - CHECKPOINT_INTERVAL, + Some(&path.join("pageserver.log")), + StorageConfig { + cache_size: CACHE_SIZE, + checkpoint_interval: CHECKPOINT_INTERVAL, + wal_flush_threshold: WAL_FLUSH_THRESHOLD, + }, )?, committed: false, }) diff --git a/zenith_utils/src/vec_map.rs b/zenith_utils/src/vec_map.rs index 4e2c827b47..8eaf0f0970 100644 --- a/zenith_utils/src/vec_map.rs +++ b/zenith_utils/src/vec_map.rs @@ -87,6 +87,10 @@ impl VecMap { Ok(None) } + pub fn last(&self) -> Option<&(K, V)> { + self.0.last() + } + /// Split the map into two. /// /// The left map contains everything before `cutoff` (exclusive).