Compare commits

...

2 Commits

Author SHA1 Message Date
Patrick Insinger
f5432ea1ca add SegmentTag to Layer queries 2022-01-02 20:35:06 -08:00
Patrick Insinger
066e3f1c69 pageserver - initial work on layer segment ranges
* add end segment tags to layer filenames
* introduce naive segment ranges in delta and image layer formats
2021-12-19 20:33:10 -08:00
9 changed files with 320 additions and 319 deletions

View File

@@ -830,7 +830,7 @@ impl Timeline for LayeredTimeline {
let segsize;
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
segsize = layer.get_seg_size(lsn)?;
segsize = layer.get_seg_size(seg, lsn)?;
trace!("get_seg_size: {} at {} -> {}", seg, lsn, segsize);
} else {
if segno == 0 {
@@ -854,7 +854,7 @@ impl Timeline for LayeredTimeline {
let result;
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
result = layer.get_seg_exists(lsn)?;
result = layer.get_seg_exists(seg, lsn)?;
} else {
result = false;
}
@@ -1867,7 +1867,7 @@ impl LayeredTimeline {
let mut curr_lsn = lsn;
loop {
let result = layer_ref
.get_page_reconstruct_data(blknum, curr_lsn, cached_lsn_opt, &mut data)
.get_page_reconstruct_data(seg, blknum, curr_lsn, cached_lsn_opt, &mut data)
.with_context(|| {
format!(
"Failed to get reconstruct data {} {:?} {} {} {:?}",

View File

@@ -5,10 +5,10 @@ use anyhow::Result;
use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct BlobRange {
offset: u64,
size: usize,
pub offset: u64,
pub size: usize,
}
pub fn read_blob<F: FileExt>(reader: &BoundedReader<&'_ F>, range: &BlobRange) -> Result<Vec<u8>> {

View File

@@ -119,16 +119,16 @@ impl From<&DeltaLayer> for Summary {
pub struct DeltaLayer {
path_or_conf: PathOrConf,
pub tenantid: ZTenantId,
pub timelineid: ZTimelineId,
pub seg: SegmentTag,
tenantid: ZTenantId,
timelineid: ZTimelineId,
seg: SegmentTag,
//
// This entry contains all the changes from 'start_lsn' to 'end_lsn'. The
// start is inclusive, and end is exclusive.
//
pub start_lsn: Lsn,
pub end_lsn: Lsn,
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
@@ -144,17 +144,15 @@ pub struct DeltaLayerInner {
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_version_metas: VecMap<(u32, Lsn), BlobRange>,
page_version_metas: VecMap<(SegmentTag, u32, Lsn), BlobRange>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>,
relsizes: VecMap<(SegmentTag, Lsn), u32>,
}
impl DeltaLayerInner {
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
let slice = self
.relsizes
.slice_range((Included(&Lsn(0)), Included(&lsn)));
fn get_seg_size(&self, seg: SegmentTag, lsn: Lsn) -> Result<u32> {
let slice = self.relsizes.slice_range((seg, Lsn(0))..=(seg, lsn));
if let Some((_entry_lsn, entry)) = slice.last() {
Ok(*entry)
} else {
@@ -195,6 +193,7 @@ impl Layer for DeltaLayer {
/// Look up given page in the cache.
fn get_page_reconstruct_data(
&self,
seg: SegmentTag,
blknum: u32,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
@@ -202,7 +201,7 @@ impl Layer for DeltaLayer {
) -> Result<PageReconstructResult> {
let mut need_image = true;
assert!(self.seg.blknum_in_seg(blknum));
assert!(seg.blknum_in_seg(blknum));
match &cached_img_lsn {
Some(cached_lsn) if &self.end_lsn <= cached_lsn => {
@@ -221,14 +220,14 @@ impl Layer for DeltaLayer {
.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
// Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let minkey = (seg, blknum, Lsn(0));
let maxkey = (seg, blknum, lsn);
let iter = inner
.page_version_metas
.slice_range((Included(&minkey), Included(&maxkey)))
.iter()
.rev();
for ((_blknum, pv_lsn), blob_range) in iter {
for ((_seg, _blknum, pv_lsn), blob_range) in iter {
match &cached_img_lsn {
Some(cached_lsn) if pv_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Cached)
@@ -260,8 +259,8 @@ impl Layer for DeltaLayer {
// If we didn't find any records for this, check if the request is beyond EOF
if need_image
&& reconstruct_data.records.is_empty()
&& self.seg.rel.is_blocky()
&& blknum - self.seg.segno * RELISH_SEG_SIZE >= inner.get_seg_size(lsn)?
&& seg.rel.is_blocky()
&& blknum - seg.segno * RELISH_SEG_SIZE >= inner.get_seg_size(seg, lsn)?
{
return Ok(PageReconstructResult::Missing(self.start_lsn));
}
@@ -279,7 +278,7 @@ impl Layer for DeltaLayer {
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
fn get_seg_size(&self, seg: SegmentTag, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
@@ -288,11 +287,13 @@ impl Layer for DeltaLayer {
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
inner.get_seg_size(lsn)
inner.get_seg_size(seg, lsn)
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
fn get_seg_exists(&self, seg: SegmentTag, lsn: Lsn) -> Result<bool> {
assert_eq!(self.seg, seg, "range get_seg_exists not supported"); // TODO
// Is the requested LSN after the rel was dropped?
if self.dropped && lsn >= self.end_lsn {
return Ok(false);
@@ -342,8 +343,8 @@ impl Layer for DeltaLayer {
println!("--- relsizes ---");
let inner = self.load()?;
for (k, v) in inner.relsizes.as_slice() {
println!(" {}: {}", k, v);
for ((seg, lsn), v) in inner.relsizes.as_slice() {
println!(" {}@{}: {}", seg, lsn, v);
}
println!("--- page versions ---");
@@ -352,12 +353,14 @@ impl Layer for DeltaLayer {
let book = Book::new(file)?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
for ((seg, blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
let mut desc = String::new();
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
write!(&mut desc, "{}", seg)?;
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
@@ -414,12 +417,20 @@ impl DeltaLayer {
dropped: bool,
page_versions: &PageVersions,
cutoff: Option<Lsn>,
relsizes: VecMap<Lsn, u32>,
relsizes: &[(Lsn, u32)],
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
}
let relsizes = {
let mut m = VecMap::default();
for &(lsn, size) in relsizes {
m.append((seg, lsn), size).unwrap();
}
m
};
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -459,7 +470,7 @@ impl DeltaLayer {
inner
.page_version_metas
.append((blknum, lsn), blob_range)
.append((seg, blknum, lsn), blob_range)
.unwrap();
}
@@ -573,7 +584,7 @@ impl DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
tenantid,
seg: filename.seg,
seg: filename.start_seg,
start_lsn: filename.start_lsn,
end_lsn: filename.end_lsn,
dropped: filename.dropped,
@@ -615,7 +626,11 @@ impl DeltaLayer {
fn layer_name(&self) -> DeltaFileName {
DeltaFileName {
seg: self.seg,
start_seg: self.seg,
end_seg: SegmentTag {
rel: self.seg.rel,
segno: self.seg.segno + 1,
},
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,

View File

@@ -5,7 +5,7 @@ use crate::layered_repository::storage_layer::SegmentTag;
use crate::relish::*;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use std::fmt;
use std::fmt::{self, Write};
use std::fs;
use std::path::PathBuf;
@@ -15,10 +15,109 @@ use zenith_utils::lsn::Lsn;
use super::metadata::METADATA_FILE_NAME;
fn parse_seg(input: &mut &str) -> Option<SegmentTag> {
let rel = if let Some(rest) = input.strip_prefix("rel_") {
let mut parts = rest.splitn(5, '_');
let rel = RelishTag::Relation(RelTag {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
relnode: parts.next()?.parse::<u32>().ok()?,
forknum: parts.next()?.parse::<u8>().ok()?,
});
*input = parts.next()?;
debug_assert!(parts.next().is_none());
rel
} else if let Some(rest) = input.strip_prefix("pg_xact_") {
let (segno, rest) = rest.split_once('_')?;
*input = rest;
RelishTag::Slru {
slru: SlruKind::Clog,
segno: u32::from_str_radix(segno, 16).ok()?,
}
} else if let Some(rest) = input.strip_prefix("pg_multixact_members_") {
let (segno, rest) = rest.split_once('_')?;
*input = rest;
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno: u32::from_str_radix(segno, 16).ok()?,
}
} else if let Some(rest) = input.strip_prefix("pg_multixact_offsets_") {
let (segno, rest) = rest.split_once('_')?;
*input = rest;
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno: u32::from_str_radix(segno, 16).ok()?,
}
} else if let Some(rest) = input.strip_prefix("pg_filenodemap_") {
let mut parts = rest.splitn(3, '_');
let rel = RelishTag::FileNodeMap {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
};
*input = parts.next()?;
debug_assert!(parts.next().is_none());
rel
} else if let Some(rest) = input.strip_prefix("pg_twophase_") {
let (xid, rest) = rest.split_once('_')?;
*input = rest;
RelishTag::TwoPhase {
xid: xid.parse::<u32>().ok()?,
}
} else if let Some(rest) = input.strip_prefix("pg_control_checkpoint_") {
*input = rest;
RelishTag::Checkpoint
} else if let Some(rest) = input.strip_prefix("pg_control_") {
*input = rest;
RelishTag::ControlFile
} else {
return None;
};
let (segno, rest) = input.split_once('_')?;
*input = rest;
Some(SegmentTag {
rel,
segno: segno.parse().ok()?,
})
}
fn write_seg(seg: &SegmentTag) -> String {
let mut s = match seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
),
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
} => format!("pg_xact_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
} => format!("pg_multixact_members_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
} => format!("pg_multixact_offsets_{:04X}", segno),
RelishTag::FileNodeMap { spcnode, dbnode } => {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => "pg_control_checkpoint".to_string(),
RelishTag::ControlFile => "pg_control".to_string(),
};
write!(&mut s, "_{}", seg.segno).unwrap();
s
}
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
pub seg: SegmentTag,
pub start_seg: SegmentTag,
pub end_seg: SegmentTag,
pub start_lsn: Lsn,
pub end_lsn: Lsn,
pub dropped: bool,
@@ -38,59 +137,12 @@ impl DeltaFileName {
/// match the expected pattern.
///
pub fn parse_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
parts = rest.split('_');
rel = RelishTag::Relation(RelTag {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
relnode: parts.next()?.parse::<u32>().ok()?,
forknum: parts.next()?.parse::<u8>().ok()?,
});
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::Clog,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
parts = rest.split('_');
rel = RelishTag::FileNodeMap {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
parts = rest.split('_');
rel = RelishTag::TwoPhase {
xid: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
parts = rest.split('_');
rel = RelishTag::Checkpoint;
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
parts = rest.split('_');
rel = RelishTag::ControlFile;
} else {
return None;
}
let segno = parts.next()?.parse::<u32>().ok()?;
let seg = SegmentTag { rel, segno };
let mut rest = fname;
let start_seg = parse_seg(&mut rest)?;
let end_seg = parse_seg(&mut rest)?;
debug_assert!(start_seg < end_seg);
let mut parts = rest.split('_');
let start_lsn = Lsn::from_hex(parts.next()?).ok()?;
let end_lsn = Lsn::from_hex(parts.next()?).ok()?;
@@ -107,7 +159,8 @@ impl DeltaFileName {
}
Some(DeltaFileName {
seg,
start_seg,
end_seg,
start_lsn,
end_lsn,
dropped,
@@ -117,36 +170,14 @@ impl DeltaFileName {
impl fmt::Display for DeltaFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
),
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
} => format!("pg_xact_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
} => format!("pg_multixact_members_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
} => format!("pg_multixact_offsets_{:04X}", segno),
RelishTag::FileNodeMap { spcnode, dbnode } => {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => "pg_control_checkpoint".to_string(),
RelishTag::ControlFile => "pg_control".to_string(),
};
let start_seg = write_seg(&self.start_seg);
let end_seg = write_seg(&self.end_seg);
write!(
f,
"{}_{}_{:016X}_{:016X}{}",
basename,
self.seg.segno,
start_seg,
end_seg,
u64::from(self.start_lsn),
u64::from(self.end_lsn),
if self.dropped { "_DROPPED" } else { "" }
@@ -156,7 +187,8 @@ impl fmt::Display for DeltaFileName {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct ImageFileName {
pub seg: SegmentTag,
pub start_seg: SegmentTag,
pub end_seg: SegmentTag,
pub lsn: Lsn,
}
@@ -171,103 +203,31 @@ impl ImageFileName {
/// match the expected pattern.
///
pub fn parse_str(fname: &str) -> Option<Self> {
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
parts = rest.split('_');
rel = RelishTag::Relation(RelTag {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
relnode: parts.next()?.parse::<u32>().ok()?,
forknum: parts.next()?.parse::<u8>().ok()?,
});
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::Clog,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
parts = rest.split('_');
rel = RelishTag::FileNodeMap {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
parts = rest.split('_');
rel = RelishTag::TwoPhase {
xid: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
parts = rest.split('_');
rel = RelishTag::Checkpoint;
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
parts = rest.split('_');
rel = RelishTag::ControlFile;
} else {
let mut rest = fname;
let start_seg = parse_seg(&mut rest)?;
let end_seg = parse_seg(&mut rest)?;
debug_assert!(start_seg < end_seg);
if rest.contains('_') {
return None;
}
let segno = parts.next()?.parse::<u32>().ok()?;
let lsn = Lsn::from_hex(rest).ok()?;
let seg = SegmentTag { rel, segno };
let lsn = Lsn::from_hex(parts.next()?).ok()?;
if parts.next().is_some() {
return None;
}
Some(ImageFileName { seg, lsn })
Some(ImageFileName {
start_seg,
end_seg,
lsn,
})
}
}
impl fmt::Display for ImageFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
),
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
} => format!("pg_xact_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
} => format!("pg_multixact_members_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
} => format!("pg_multixact_offsets_{:04X}", segno),
RelishTag::FileNodeMap { spcnode, dbnode } => {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => "pg_control_checkpoint".to_string(),
RelishTag::ControlFile => "pg_control".to_string(),
};
let start_seg = write_seg(&self.start_seg);
let end_seg = write_seg(&self.end_seg);
write!(
f,
"{}_{}_{:016X}",
basename,
self.seg.segno,
u64::from(self.lsn),
)
write!(f, "{}_{}_{:016X}", start_seg, end_seg, u64::from(self.lsn),)
}
}

View File

@@ -21,6 +21,7 @@
//!
//! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
//!
use crate::layered_repository::blob::read_blob;
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
@@ -39,18 +40,20 @@ use std::fs;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
use zenith_utils::vec_map::VecMap;
use bookfile::{Book, BookWriter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use super::blob::BlobRange;
// Magic constant to identify a Zenith segment image file
pub const IMAGE_FILE_MAGIC: u32 = 0x5A616E01 + 1;
/// Contains each block in block # order
const BLOCKY_IMAGES_CHAPTER: u64 = 1;
const NONBLOCKY_IMAGE_CHAPTER: u64 = 2;
const BLOB_CHAPTER: u64 = 4;
const META_CHAPTER: u64 = 5;
/// Contains the [`Summary`] struct
const SUMMARY_CHAPTER: u64 = 3;
@@ -87,28 +90,31 @@ const BLOCK_SIZE: usize = 8192;
///
pub struct ImageLayer {
path_or_conf: PathOrConf,
pub tenantid: ZTenantId,
pub timelineid: ZTimelineId,
pub seg: SegmentTag,
tenantid: ZTenantId,
timelineid: ZTimelineId,
seg: SegmentTag,
// This entry contains an image of all pages as of this LSN
pub lsn: Lsn,
lsn: Lsn,
inner: Mutex<ImageLayerInner>,
}
#[derive(Clone)]
enum ImageType {
Blocky { num_blocks: u32 },
NonBlocky,
}
pub struct ImageLayerInner {
/// If None, the 'image_type' has not been loaded into memory yet.
book: Option<Book<VirtualFile>>,
/// Derived from filename and bookfile chapter metadata
image_type: ImageType,
meta: VecMap<SegmentTag, BlobRange>,
}
impl ImageLayerInner {
fn get_seg_blob_range(&self, seg: SegmentTag) -> Result<BlobRange> {
self.meta
.as_slice()
.binary_search_by_key(&&seg, |(seg, _meta)| seg)
.map(|idx| self.meta.as_slice()[idx].1.clone())
.map_err(|_| anyhow!("segment not found in ImageLayer"))
}
}
impl Layer for ImageLayer {
@@ -144,6 +150,7 @@ impl Layer for ImageLayer {
/// Look up given page in the file
fn get_page_reconstruct_data(
&self,
seg: SegmentTag,
blknum: u32,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
@@ -160,34 +167,29 @@ impl Layer for ImageLayer {
let base_blknum = blknum % RELISH_SEG_SIZE;
let buf = match &inner.image_type {
ImageType::Blocky { num_blocks } => {
// Check if the request is beyond EOF
if base_blknum >= *num_blocks {
return Ok(PageReconstructResult::Missing(lsn));
}
let blob_range = inner.get_seg_blob_range(seg)?;
let mut buf = vec![0u8; BLOCK_SIZE];
let offset = BLOCK_SIZE as u64 * base_blknum as u64;
let chapter = inner.book.as_ref().unwrap().chapter_reader(BLOB_CHAPTER)?;
let chapter = inner
.book
.as_ref()
.unwrap()
.chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.read_exact_at(&mut buf, offset)?;
buf
}
ImageType::NonBlocky => {
ensure!(base_blknum == 0);
inner
.book
.as_ref()
.unwrap()
.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?
.into_vec()
let buf = if seg.rel.is_blocky() {
// Check if the request is beyond EOF
if base_blknum >= get_num_blocks(&blob_range) {
return Ok(PageReconstructResult::Missing(lsn));
}
let mut buf = vec![0u8; BLOCK_SIZE];
let block_offset = BLOCK_SIZE as u64 * base_blknum as u64;
assert!(block_offset + BLOCK_SIZE as u64 <= blob_range.size as u64);
let offset = blob_range.offset + block_offset;
chapter.read_exact_at(&mut buf, offset)?;
buf
} else {
ensure!(base_blknum == 0);
read_blob(&chapter, &blob_range)?
};
reconstruct_data.page_img = Some(Bytes::from(buf));
@@ -195,17 +197,26 @@ impl Layer for ImageLayer {
}
/// Get size of the segment
fn get_seg_size(&self, _lsn: Lsn) -> Result<u32> {
let inner = self.load()?;
match inner.image_type {
ImageType::Blocky { num_blocks } => Ok(num_blocks),
ImageType::NonBlocky => Err(anyhow!("get_seg_size called for non-blocky segment")),
fn get_seg_size(&self, seg: SegmentTag, _lsn: Lsn) -> Result<u32> {
if !self.seg.rel.is_blocky() {
bail!("get_seg_size called for non-blocky segment");
}
let inner = self.load()?;
let blob_range = inner.get_seg_blob_range(seg)?;
Ok(get_num_blocks(&blob_range))
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
Ok(true)
fn get_seg_exists(&self, seg: SegmentTag, _lsn: Lsn) -> Result<bool> {
let inner = self.load()?;
Ok(inner
.meta
.as_slice()
.binary_search_by_key(&&seg, |(seg, _meta)| seg)
.is_ok())
}
fn unload(&self) -> Result<()> {
@@ -235,15 +246,11 @@ impl Layer for ImageLayer {
let inner = self.load()?;
match inner.image_type {
ImageType::Blocky { num_blocks } => println!("({}) blocks ", num_blocks),
ImageType::NonBlocky => {
let chapter = inner
.book
.as_ref()
.unwrap()
.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?;
println!("non-blocky ({} bytes)", chapter.len());
for (seg, blob_range) in inner.meta.as_slice() {
if seg.rel.is_blocky() {
println!("{} ({}) blocks ", seg, get_num_blocks(blob_range));
} else {
println!("{} non-blocky ({} bytes)", seg, blob_range.size);
}
}
@@ -275,15 +282,7 @@ impl ImageLayer {
lsn: Lsn,
base_images: Vec<Bytes>,
) -> Result<ImageLayer> {
let image_type = if seg.rel.is_blocky() {
let num_blocks: u32 = base_images.len().try_into()?;
ImageType::Blocky { num_blocks }
} else {
assert_eq!(base_images.len(), 1);
ImageType::NonBlocky
};
let layer = ImageLayer {
let mut layer = ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
tenantid,
@@ -291,10 +290,9 @@ impl ImageLayer {
lsn,
inner: Mutex::new(ImageLayerInner {
book: None,
image_type: image_type.clone(),
meta: VecMap::default(),
}),
};
let inner = layer.inner.lock().unwrap();
// Write the images into a file
//
@@ -309,22 +307,33 @@ impl ImageLayer {
let buf_writer = BufWriter::new(file);
let book = BookWriter::new(buf_writer, IMAGE_FILE_MAGIC)?;
let book = match &image_type {
ImageType::Blocky { .. } => {
let mut chapter = book.new_chapter(BLOCKY_IMAGES_CHAPTER);
for block_bytes in base_images {
assert_eq!(block_bytes.len(), BLOCK_SIZE);
chapter.write_all(&block_bytes)?;
}
chapter.close()?
}
ImageType::NonBlocky => {
let mut chapter = book.new_chapter(NONBLOCKY_IMAGE_CHAPTER);
chapter.write_all(&base_images[0])?;
chapter.close()?
let mut blob_chapter = book.new_chapter(BLOB_CHAPTER);
let size = if seg.rel.is_blocky() {
for block_bytes in &base_images {
assert_eq!(block_bytes.len(), BLOCK_SIZE);
blob_chapter.write_all(block_bytes)?;
}
BLOCK_SIZE * base_images.len()
} else {
assert_eq!(base_images.len(), 1);
blob_chapter.write_all(&base_images[0])?;
base_images[0].len()
};
let book = blob_chapter.close()?;
let inner = layer.inner.get_mut().unwrap();
inner
.meta
.append(seg, BlobRange { offset: 0, size })
.unwrap();
let mut meta_chapter = book.new_chapter(META_CHAPTER);
inner.meta.ser_into(&mut meta_chapter)?;
let book = meta_chapter.close()?;
let mut chapter = book.new_chapter(SUMMARY_CHAPTER);
let summary = Summary {
tenantid,
@@ -342,8 +351,6 @@ impl ImageLayer {
trace!("saved {}", path.display());
drop(inner);
Ok(layer)
}
@@ -355,13 +362,14 @@ impl ImageLayer {
src: &dyn Layer,
lsn: Lsn,
) -> Result<ImageLayer> {
// TODO needs to become an image of all segments in the layer
let seg = src.get_seg_tag();
let timelineid = timeline.timelineid;
let startblk;
let size;
if seg.rel.is_blocky() {
size = src.get_seg_size(lsn)?;
size = src.get_seg_size(seg, lsn)?;
startblk = seg.segno * RELISH_SEG_SIZE;
} else {
size = 1;
@@ -431,22 +439,13 @@ impl ImageLayer {
}
}
let image_type = if self.seg.rel.is_blocky() {
let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
let images_len = chapter.len();
ensure!(images_len % BLOCK_SIZE as u64 == 0);
let num_blocks: u32 = (images_len / BLOCK_SIZE as u64).try_into()?;
ImageType::Blocky { num_blocks }
} else {
let _chapter = book.chapter_reader(NONBLOCKY_IMAGE_CHAPTER)?;
ImageType::NonBlocky
};
let meta = VecMap::des(&book.read_chapter(META_CHAPTER)?)?;
debug!("loaded from {}", &path.display());
*inner = ImageLayerInner {
book: Some(book),
image_type,
meta,
};
Ok(inner)
@@ -463,11 +462,11 @@ impl ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
tenantid,
seg: filename.seg,
seg: filename.start_seg,
lsn: filename.lsn,
inner: Mutex::new(ImageLayerInner {
book: None,
image_type: ImageType::Blocky { num_blocks: 0 },
meta: VecMap::default(),
}),
}
}
@@ -490,14 +489,18 @@ impl ImageLayer {
lsn: summary.lsn,
inner: Mutex::new(ImageLayerInner {
book: None,
image_type: ImageType::Blocky { num_blocks: 0 },
meta: VecMap::default(),
}),
})
}
fn layer_name(&self) -> ImageFileName {
ImageFileName {
seg: self.seg,
start_seg: self.seg,
end_seg: SegmentTag {
rel: self.seg.rel,
segno: self.seg.segno + 1,
},
lsn: self.lsn,
}
}
@@ -512,3 +515,9 @@ impl ImageLayer {
)
}
}
/// Must only be called for blob ranges of blocky relishes.
fn get_num_blocks(blob_range: &BlobRange) -> u32 {
assert_eq!(blob_range.size % BLOCK_SIZE, 0);
(blob_range.size / BLOCK_SIZE).try_into().unwrap()
}

View File

@@ -106,7 +106,11 @@ impl Layer for InMemoryLayer {
}
let delta_filename = DeltaFileName {
seg: self.seg,
start_seg: self.seg,
end_seg: SegmentTag {
rel: self.seg.rel,
segno: self.seg.segno + 1,
},
start_lsn: self.start_lsn,
end_lsn,
dropped: inner.dropped,
@@ -150,11 +154,14 @@ impl Layer for InMemoryLayer {
/// Look up given page in the cache.
fn get_page_reconstruct_data(
&self,
seg: SegmentTag,
blknum: u32,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
assert_eq!(self.seg, seg); // TODO
let mut need_image = true;
assert!(self.seg.blknum_in_seg(blknum));
@@ -198,7 +205,7 @@ impl Layer for InMemoryLayer {
if need_image
&& reconstruct_data.records.is_empty()
&& self.seg.rel.is_blocky()
&& blknum - self.seg.segno * RELISH_SEG_SIZE >= self.get_seg_size(lsn)?
&& blknum - self.seg.segno * RELISH_SEG_SIZE >= self.get_seg_size(seg, lsn)?
{
return Ok(PageReconstructResult::Missing(self.start_lsn));
}
@@ -220,7 +227,9 @@ impl Layer for InMemoryLayer {
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
fn get_seg_size(&self, seg: SegmentTag, lsn: Lsn) -> Result<u32> {
assert_eq!(self.seg, seg);
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
@@ -232,7 +241,9 @@ impl Layer for InMemoryLayer {
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
fn get_seg_exists(&self, seg: SegmentTag, lsn: Lsn) -> Result<bool> {
assert_eq!(self.seg, seg);
let inner = self.inner.read().unwrap();
// If the segment created after requested LSN,
@@ -517,7 +528,7 @@ impl InMemoryLayer {
// Copy the segment size at the start LSN from the predecessor layer.
let mut segsizes = VecMap::default();
if seg.rel.is_blocky() {
let size = src.get_seg_size(start_lsn)?;
let size = src.get_seg_size(seg, start_lsn)?;
segsizes.append(start_lsn, size).unwrap();
}
@@ -606,7 +617,7 @@ impl InMemoryLayer {
true,
&inner.page_versions,
None,
inner.segsizes.clone(),
inner.segsizes.as_slice(),
)?;
trace!(
"freeze: created delta layer for dropped segment {} {}-{}",
@@ -640,7 +651,7 @@ impl InMemoryLayer {
false,
&inner.page_versions,
Some(end_lsn_inclusive),
segsizes,
segsizes.as_slice(), // TODO avoid copy above
)?;
delta_layers.push(delta_layer);
trace!(

View File

@@ -169,7 +169,7 @@ impl LayerMap {
if (request_rel.spcnode == 0 || reltag.spcnode == request_rel.spcnode)
&& (request_rel.dbnode == 0 || reltag.dbnode == request_rel.dbnode)
{
if let Some(exists) = segentry.exists_at_lsn(lsn)? {
if let Some(exists) = segentry.exists_at_lsn(*seg, lsn)? {
rels.insert(seg.rel, exists);
}
}
@@ -177,7 +177,7 @@ impl LayerMap {
}
_ => {
if tag == None {
if let Some(exists) = segentry.exists_at_lsn(lsn)? {
if let Some(exists) = segentry.exists_at_lsn(*seg, lsn)? {
rels.insert(seg.rel, exists);
}
}
@@ -207,7 +207,7 @@ impl LayerMap {
/// to avoid incorrectly making it visible.
pub fn layer_exists_at_lsn(&self, seg: SegmentTag, lsn: Lsn) -> Result<bool> {
Ok(if let Some(segentry) = self.segs.get(&seg) {
segentry.exists_at_lsn(lsn)?.unwrap_or(false)
segentry.exists_at_lsn(seg, lsn)?.unwrap_or(false)
} else {
false
})
@@ -292,9 +292,9 @@ struct SegEntry {
impl SegEntry {
/// Does the segment exist at given LSN?
/// Return None if object is not found in this SegEntry.
fn exists_at_lsn(&self, lsn: Lsn) -> Result<Option<bool>> {
fn exists_at_lsn(&self, seg: SegmentTag, lsn: Lsn) -> Result<Option<bool>> {
if let Some(layer) = self.get(lsn) {
Ok(Some(layer.get_seg_exists(lsn)?))
Ok(Some(layer.get_seg_exists(seg, lsn)?))
} else {
Ok(None)
}

View File

@@ -139,6 +139,7 @@ pub trait Layer: Send + Sync {
/// to collect more data.
fn get_page_reconstruct_data(
&self,
seg: SegmentTag,
blknum: u32,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
@@ -146,10 +147,10 @@ pub trait Layer: Send + Sync {
) -> Result<PageReconstructResult>;
/// Return size of the segment at given LSN. (Only for blocky relations.)
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
fn get_seg_size(&self, seg: SegmentTag, lsn: Lsn) -> Result<u32>;
/// Does the segment exist at given LSN? Or was it dropped before it.
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
fn get_seg_exists(&self, seg: SegmentTag, lsn: Lsn) -> Result<bool>;
/// Does this layer only contain some data for the segment (incremental),
/// or does it contain a version of every page? This is important to know

View File

@@ -993,12 +993,17 @@ mod tests {
let tline_dir = harness.conf.timeline_path(&TIMELINE_ID, &harness.tenant_id);
let expected_image_layer_path = tline_dir.join(format!(
"rel_{}_{}_{}_{}_{}_{:016X}_{:016X}",
"rel_{}_{}_{}_{}_{}_rel_{}_{}_{}_{}_{}_{:016X}_{:016X}",
TESTREL_A_REL_TAG.spcnode,
TESTREL_A_REL_TAG.dbnode,
TESTREL_A_REL_TAG.relnode,
TESTREL_A_REL_TAG.forknum,
0, // seg is 0
TESTREL_A_REL_TAG.spcnode,
TESTREL_A_REL_TAG.dbnode,
TESTREL_A_REL_TAG.relnode,
TESTREL_A_REL_TAG.forknum,
1, // end seg is 1
0x20,
0x30,
));
@@ -1146,13 +1151,13 @@ mod tests {
// These files are considered to be in the future and will be renamed out
// of the way
let future_filenames = vec![
format!("pg_control_0_{:016X}", 0x8001),
format!("pg_control_0_{:016X}_{:016X}", 0x8001, 0x8008),
format!("pg_control_0_pg_control_1_{:016X}", 0x8001),
format!("pg_control_0_pg_control_1_{:016X}_{:016X}", 0x8001, 0x8008),
];
// But these are not:
let past_filenames = vec![
format!("pg_control_0_{:016X}", 0x8000),
format!("pg_control_0_{:016X}_{:016X}", 0x7000, 0x8001),
format!("pg_control_0_pg_control_1_{:016X}", 0x8000),
format!("pg_control_0_pg_control_1_{:016X}_{:016X}", 0x7000, 0x8001),
];
for filename in future_filenames.iter().chain(past_filenames.iter()) {