Use binary format for PageVersion

Still lots of work to do here. Also a lot of copying going on that
should be avoided.
This commit is contained in:
Patrick Insinger
2021-11-04 00:24:37 -07:00
parent 445288d7c1
commit b26b471250
10 changed files with 124 additions and 61 deletions

View File

@@ -1601,7 +1601,7 @@ impl LayeredTimeline {
rel,
request_lsn
);
Ok(img.clone())
Ok(Bytes::from(img.image().to_vec()))
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn);
@@ -1612,7 +1612,7 @@ impl LayeredTimeline {
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init {
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() {
// FIXME: this ought to be an error?
warn!(
"Base image for page {}/{} at {} not found, but got {} WAL records",
@@ -1632,7 +1632,7 @@ impl LayeredTimeline {
rel,
blknum,
request_lsn,
data.page_img.clone(),
data.page_img.map(|page| Bytes::from(page.image().to_vec())), // FIXME
data.records,
)?;

View File

@@ -10,10 +10,10 @@ pub struct BlobRange {
size: usize,
}
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Box<[u8]>> {
let mut buf = vec![0u8; range.size];
reader.read_exact_at(&mut buf, range.offset)?;
Ok(buf)
Ok(buf.into_boxed_slice())
}
pub struct BlobWriter<W> {

View File

@@ -47,6 +47,7 @@ use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::vec_map::VecMap;
@@ -204,7 +205,8 @@ impl Layer for DeltaLayer {
.iter()
.rev();
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
let pv_bytes = read_blob(&page_version_reader, blob_range)?;
let pv = PageVersion::from_bytes(pv_bytes);
match pv {
PageVersion::Page(img) => {
@@ -214,7 +216,7 @@ impl Layer for DeltaLayer {
break;
}
PageVersion::Wal(rec) => {
let will_init = rec.will_init;
let will_init = rec.will_init();
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
@@ -310,19 +312,20 @@ impl Layer for DeltaLayer {
let mut desc = String::new();
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
let pv = PageVersion::from_bytes(buf);
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
PageVersion::Page(page) => {
write!(&mut desc, " img {} bytes", page.image().len())?;
}
PageVersion::Wal(rec) => {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
let wal_desc =
waldecoder::describe_wal_record(&Bytes::from(rec.rec().to_vec()));
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
rec.rec().len(),
rec.will_init(),
wal_desc
)?;
}

View File

@@ -23,7 +23,7 @@
//!
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
Layer, Page, PageReconstructData, PageReconstructResult, SegmentTag,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
@@ -177,7 +177,7 @@ impl Layer for ImageLayer {
}
};
reconstruct_data.page_img = Some(Bytes::from(buf));
reconstruct_data.page_img = Some(Page::from_bytes(&buf));
Ok(PageReconstructResult::Complete)
}

View File

@@ -8,7 +8,6 @@ use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::ZERO_PAGE;
use crate::layered_repository::{DeltaLayer, ImageLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
@@ -24,6 +23,7 @@ use zenith_utils::vec_map::VecMap;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
use super::storage_layer::Page;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -186,14 +186,15 @@ impl Layer for InMemoryLayer {
.rev();
for (entry_lsn, token) in iter {
match inner.page_versions.get_page_version(token)? {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some(img);
PageVersion::Page(page) => {
reconstruct_data.page_img = Some(page);
need_image = false;
break;
}
PageVersion::Wal(rec) => {
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init {
let will_init = rec.will_init();
reconstruct_data.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
@@ -361,7 +362,7 @@ impl InMemoryLayer {
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
self.put_page_version(blknum, lsn, PageVersion::Page(img))
self.put_page_version(blknum, lsn, PageVersion::Page(Page::from_bytes(&img)))
}
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
@@ -382,8 +383,8 @@ impl InMemoryLayer {
let mut mem_usage = 0;
mem_usage += match &pv {
PageVersion::Page(img) => img.len(),
PageVersion::Wal(rec) => rec.rec.len(),
PageVersion::Page(page) => page.image().len(),
PageVersion::Wal(rec) => rec.rec().len(),
};
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
@@ -425,7 +426,7 @@ impl InMemoryLayer {
// subsequent call to initialize the gap page.
let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize;
for gapblknum in gapstart..blknum {
let zeropv = PageVersion::Page(ZERO_PAGE.clone());
let zeropv = PageVersion::Page(Page::zero_page());
trace!(
"filling gap blk {} with zeros for write of {}",
gapblknum,

View File

@@ -7,8 +7,6 @@ use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
use zenith_utils::bin_ser::LeSer;
const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[];
#[derive(Debug, Default)]
@@ -28,7 +26,7 @@ impl PageVersions {
lsn: Lsn,
page_version: PageVersion,
) -> Option<ChunkToken> {
let token = self.buffer.write(page_version.ser().unwrap().as_slice());
let token = self.buffer.write(page_version.bytes());
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, token).unwrap()
@@ -79,8 +77,8 @@ impl PageVersions {
}
pub fn get_page_version(&self, token: &ChunkToken) -> Result<PageVersion> {
let buf = self.get_page_version_bytes(token);
Ok(PageVersion::des(buf.as_slice())?) // TODO unwrap
let buf = self.get_page_version_bytes(token).into_boxed_slice();
Ok(PageVersion::from_bytes(buf))
}
}
@@ -127,7 +125,7 @@ impl<'a> Iterator for OrderedPageVersionIter<'a> {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::layered_repository::storage_layer::Page;
use super::*;
@@ -137,8 +135,7 @@ mod tests {
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
let empty_page = Bytes::from_static(&[0u8; 8192]);
let empty_page_version = PageVersion::Page(empty_page);
let empty_page_version = PageVersion::Page(Page::zero_page());
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {

View File

@@ -3,10 +3,9 @@
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::repository::{WALRecord, WAL_BIT};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -51,12 +50,52 @@ impl SegmentTag {
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub enum PageVersion {
Page(Bytes),
Page(Page),
Wal(WALRecord),
}
impl PageVersion {
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
if bytes[0] & WAL_BIT != 0 {
Self::Wal(WALRecord::from_bytes(bytes))
} else {
Self::Page(Page(bytes))
}
}
pub fn bytes(&self) -> &[u8] {
match self {
Self::Page(page) => page.bytes(),
Self::Wal(wal) => wal.bytes(),
}
}
}
#[derive(Debug, Clone)]
pub struct Page(Box<[u8]>);
impl Page {
pub fn zero_page() -> Self {
// TODO optimize this
Self(vec![0u8; 1 + 8192].into_boxed_slice())
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let mut buf = vec![0u8; 1 + bytes.len()];
buf[1..].copy_from_slice(bytes);
Self(buf.into_boxed_slice())
}
pub fn image(&self) -> &[u8] {
&self.0[1..]
}
pub fn bytes(&self) -> &[u8] {
&self.0
}
}
///
/// Data needed to reconstruct a page version
///
@@ -66,7 +105,7 @@ pub enum PageVersion {
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
pub page_img: Option<Page>,
}
/// Return value from Layer::get_page_reconstruct_data

View File

@@ -2,8 +2,8 @@ use crate::relish::*;
use crate::CheckpointConfig;
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::TryInto;
use std::ops::{AddAssign, Deref};
use std::sync::Arc;
use std::time::Duration;
@@ -192,14 +192,45 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
fn advance_last_record_lsn(&self, lsn: Lsn);
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub will_init: bool,
pub rec: Bytes,
#[derive(Debug, Clone)]
pub struct WALRecord(Box<[u8]>);
pub const WAL_BIT: u8 = 0b1;
const WILL_INIT_BIT: u8 = 0b10;
impl WALRecord {
pub fn new(will_init: bool, main_data_offset: u32, rec: &[u8]) -> Self {
// TODO avoid init
let mut buf = vec![0u8; 1 + 4 + rec.len()];
buf[0] = WAL_BIT | if will_init { WILL_INIT_BIT } else { 0 };
let mdo = u32::to_le_bytes(main_data_offset);
buf[1..5].copy_from_slice(&mdo);
buf[5..].copy_from_slice(rec);
Self(buf.into_boxed_slice())
}
pub fn from_bytes(bytes: Box<[u8]>) -> Self {
Self(bytes)
}
pub fn will_init(&self) -> bool {
self.0[0] & WILL_INIT_BIT != 0
}
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: u32,
pub fn main_data_offset(&self) -> u32 {
u32::from_le_bytes(self.0[1..5].try_into().unwrap())
}
pub fn rec(&self) -> &[u8] {
&self.0[5..]
}
pub fn bytes(&self) -> &[u8] {
&self.0[..]
}
}
#[cfg(test)]

View File

@@ -427,11 +427,11 @@ pub fn save_decoded_record(
forknum: blk.forknum as u8,
});
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(
blk.will_init || blk.apply_image,
decoded.main_data_offset as u32,
&recdata[..],
);
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
@@ -770,11 +770,7 @@ fn save_xact_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
let rec = WALRecord {
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
timeline.put_wal_record(
lsn,
RelishTag::Slru {
@@ -886,11 +882,7 @@ fn save_multixact_create_record(
xlrec: &XlMultiXactCreate,
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]);
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;

View File

@@ -317,7 +317,7 @@ impl PostgresRedoManager {
}
// Apply all collected WAL records
for (_lsn, record) in records {
let mut buf = record.rec.clone();
let mut buf = Bytes::from(record.rec().to_vec());
WAL_REDO_RECORD_COUNTER.inc();
@@ -328,7 +328,7 @@ impl PostgresRedoManager {
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
let skip = (record.main_data_offset() - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
@@ -574,7 +574,7 @@ impl PostgresRedoProcess {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
build_apply_record_msg(*lsn, rec.rec(), &mut writebuf);
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);