From b26b4712506d2a297e24e704cba0f3454f91df91 Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Thu, 4 Nov 2021 00:24:37 -0700 Subject: [PATCH] Use binary format for PageVersion Still lots of work to do here. Also a lot of copying going on that should be avoided. --- pageserver/src/layered_repository.rs | 6 +-- pageserver/src/layered_repository/blob.rs | 4 +- .../src/layered_repository/delta_layer.rs | 19 ++++--- .../src/layered_repository/image_layer.rs | 4 +- .../src/layered_repository/inmemory_layer.rs | 19 +++---- .../src/layered_repository/page_versions.rs | 13 ++--- .../src/layered_repository/storage_layer.rs | 49 +++++++++++++++++-- pageserver/src/repository.rs | 43 +++++++++++++--- pageserver/src/restore_local_repo.rs | 22 +++------ pageserver/src/walredo.rs | 6 +-- 10 files changed, 124 insertions(+), 61 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 665840cba8..569cf18fd3 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1601,7 +1601,7 @@ impl LayeredTimeline { rel, request_lsn ); - Ok(img.clone()) + Ok(Bytes::from(img.image().to_vec())) } else { // FIXME: this ought to be an error? warn!("Page {} blk {} at {} not found", rel, blknum, request_lsn); @@ -1612,7 +1612,7 @@ impl LayeredTimeline { // // If we don't have a base image, then the oldest WAL record better initialize // the page - if data.page_img.is_none() && !data.records.first().unwrap().1.will_init { + if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() { // FIXME: this ought to be an error? warn!( "Base image for page {}/{} at {} not found, but got {} WAL records", @@ -1632,7 +1632,7 @@ impl LayeredTimeline { rel, blknum, request_lsn, - data.page_img.clone(), + data.page_img.map(|page| Bytes::from(page.image().to_vec())), // FIXME data.records, )?; diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs index b7c7c3f460..29e57b4114 100644 --- a/pageserver/src/layered_repository/blob.rs +++ b/pageserver/src/layered_repository/blob.rs @@ -10,10 +10,10 @@ pub struct BlobRange { size: usize, } -pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result> { +pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result> { let mut buf = vec![0u8; range.size]; reader.read_exact_at(&mut buf, range.offset)?; - Ok(buf) + Ok(buf.into_boxed_slice()) } pub struct BlobWriter { diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index b41b0341cb..8a716ad582 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -47,6 +47,7 @@ use crate::waldecoder; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, ensure, Result}; +use bytes::Bytes; use log::*; use serde::{Deserialize, Serialize}; use zenith_utils::vec_map::VecMap; @@ -204,7 +205,8 @@ impl Layer for DeltaLayer { .iter() .rev(); for ((_blknum, pv_lsn), blob_range) in iter { - let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?; + let pv_bytes = read_blob(&page_version_reader, blob_range)?; + let pv = PageVersion::from_bytes(pv_bytes); match pv { PageVersion::Page(img) => { @@ -214,7 +216,7 @@ impl Layer for DeltaLayer { break; } PageVersion::Wal(rec) => { - let will_init = rec.will_init; + let will_init = rec.will_init(); reconstruct_data.records.push((*pv_lsn, rec)); if will_init { // This WAL record initializes the page, so no need to go further back @@ -310,19 +312,20 @@ impl Layer for DeltaLayer { let mut desc = String::new(); let buf = read_blob(&chapter, blob_range)?; - let pv = PageVersion::des(&buf)?; + let pv = PageVersion::from_bytes(buf); match pv { - PageVersion::Page(img) => { - write!(&mut desc, " img {} bytes", img.len())?; + PageVersion::Page(page) => { + write!(&mut desc, " img {} bytes", page.image().len())?; } PageVersion::Wal(rec) => { - let wal_desc = waldecoder::describe_wal_record(&rec.rec); + let wal_desc = + waldecoder::describe_wal_record(&Bytes::from(rec.rec().to_vec())); write!( &mut desc, " rec {} bytes will_init: {} {}", - rec.rec.len(), - rec.will_init, + rec.rec().len(), + rec.will_init(), wal_desc )?; } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 179b66853e..6e7af639d9 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -23,7 +23,7 @@ //! use crate::layered_repository::filename::{ImageFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ - Layer, PageReconstructData, PageReconstructResult, SegmentTag, + Layer, Page, PageReconstructData, PageReconstructResult, SegmentTag, }; use crate::layered_repository::LayeredTimeline; use crate::layered_repository::RELISH_SEG_SIZE; @@ -177,7 +177,7 @@ impl Layer for ImageLayer { } }; - reconstruct_data.page_img = Some(Bytes::from(buf)); + reconstruct_data.page_img = Some(Page::from_bytes(&buf)); Ok(PageReconstructResult::Complete) } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 3fab51f76d..2110b81697 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -8,7 +8,6 @@ use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::ZERO_PAGE; use crate::layered_repository::{DeltaLayer, ImageLayer}; use crate::repository::WALRecord; use crate::PageServerConf; @@ -24,6 +23,7 @@ use zenith_utils::vec_map::VecMap; use zenith_utils::lsn::Lsn; use super::page_versions::PageVersions; +use super::storage_layer::Page; pub struct InMemoryLayer { conf: &'static PageServerConf, @@ -186,14 +186,15 @@ impl Layer for InMemoryLayer { .rev(); for (entry_lsn, token) in iter { match inner.page_versions.get_page_version(token)? { - PageVersion::Page(img) => { - reconstruct_data.page_img = Some(img); + PageVersion::Page(page) => { + reconstruct_data.page_img = Some(page); need_image = false; break; } PageVersion::Wal(rec) => { - reconstruct_data.records.push((*entry_lsn, rec.clone())); - if rec.will_init { + let will_init = rec.will_init(); + reconstruct_data.records.push((*entry_lsn, rec)); + if will_init { // This WAL record initializes the page, so no need to go further back need_image = false; break; @@ -361,7 +362,7 @@ impl InMemoryLayer { /// Remember new page version, as a full page image pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 { - self.put_page_version(blknum, lsn, PageVersion::Page(img)) + self.put_page_version(blknum, lsn, PageVersion::Page(Page::from_bytes(&img))) } /// Common subroutine of the public put_wal_record() and put_page_image() functions. @@ -382,8 +383,8 @@ impl InMemoryLayer { let mut mem_usage = 0; mem_usage += match &pv { - PageVersion::Page(img) => img.len(), - PageVersion::Wal(rec) => rec.rec.len(), + PageVersion::Page(page) => page.image().len(), + PageVersion::Wal(rec) => rec.rec().len(), }; let old = inner.page_versions.append_or_update_last(blknum, lsn, pv); @@ -425,7 +426,7 @@ impl InMemoryLayer { // subsequent call to initialize the gap page. let gapstart = self.seg.segno * RELISH_SEG_SIZE + oldsize; for gapblknum in gapstart..blknum { - let zeropv = PageVersion::Page(ZERO_PAGE.clone()); + let zeropv = PageVersion::Page(Page::zero_page()); trace!( "filling gap blk {} with zeros for write of {}", gapblknum, diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index 1cbd25b851..d812fe84bc 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -7,8 +7,6 @@ use zenith_utils::{lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; -use zenith_utils::bin_ser::LeSer; - const EMPTY_SLICE: &[(Lsn, ChunkToken)] = &[]; #[derive(Debug, Default)] @@ -28,7 +26,7 @@ impl PageVersions { lsn: Lsn, page_version: PageVersion, ) -> Option { - let token = self.buffer.write(page_version.ser().unwrap().as_slice()); + let token = self.buffer.write(page_version.bytes()); let map = self.map.entry(blknum).or_insert_with(VecMap::default); map.append_or_update_last(lsn, token).unwrap() @@ -79,8 +77,8 @@ impl PageVersions { } pub fn get_page_version(&self, token: &ChunkToken) -> Result { - let buf = self.get_page_version_bytes(token); - Ok(PageVersion::des(buf.as_slice())?) // TODO unwrap + let buf = self.get_page_version_bytes(token).into_boxed_slice(); + Ok(PageVersion::from_bytes(buf)) } } @@ -127,7 +125,7 @@ impl<'a> Iterator for OrderedPageVersionIter<'a> { #[cfg(test)] mod tests { - use bytes::Bytes; + use crate::layered_repository::storage_layer::Page; use super::*; @@ -137,8 +135,7 @@ mod tests { const BLOCKS: u32 = 1000; const LSNS: u64 = 50; - let empty_page = Bytes::from_static(&[0u8; 8192]); - let empty_page_version = PageVersion::Page(empty_page); + let empty_page_version = PageVersion::Page(Page::zero_page()); for blknum in 0..BLOCKS { for lsn in 0..LSNS { diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index bd800c72a4..d8fe29b2a6 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -3,10 +3,9 @@ //! use crate::relish::RelishTag; -use crate::repository::WALRecord; +use crate::repository::{WALRecord, WAL_BIT}; use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; -use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::fmt; use std::path::PathBuf; @@ -51,12 +50,52 @@ impl SegmentTag { /// /// A page version can be stored as a full page image, or as WAL record that needs /// to be applied over the previous page version to reconstruct this version. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub enum PageVersion { - Page(Bytes), + Page(Page), Wal(WALRecord), } +impl PageVersion { + pub fn from_bytes(bytes: Box<[u8]>) -> Self { + if bytes[0] & WAL_BIT != 0 { + Self::Wal(WALRecord::from_bytes(bytes)) + } else { + Self::Page(Page(bytes)) + } + } + + pub fn bytes(&self) -> &[u8] { + match self { + Self::Page(page) => page.bytes(), + Self::Wal(wal) => wal.bytes(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Page(Box<[u8]>); + +impl Page { + pub fn zero_page() -> Self { + // TODO optimize this + Self(vec![0u8; 1 + 8192].into_boxed_slice()) + } + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut buf = vec![0u8; 1 + bytes.len()]; + buf[1..].copy_from_slice(bytes); + Self(buf.into_boxed_slice()) + } + + pub fn image(&self) -> &[u8] { + &self.0[1..] + } + + pub fn bytes(&self) -> &[u8] { + &self.0 + } +} + /// /// Data needed to reconstruct a page version /// @@ -66,7 +105,7 @@ pub enum PageVersion { /// pub struct PageReconstructData { pub records: Vec<(Lsn, WALRecord)>, - pub page_img: Option, + pub page_img: Option, } /// Return value from Layer::get_page_reconstruct_data diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 66d930bb85..ccc7ccebbd 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -2,8 +2,8 @@ use crate::relish::*; use crate::CheckpointConfig; use anyhow::Result; use bytes::Bytes; -use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use std::convert::TryInto; use std::ops::{AddAssign, Deref}; use std::sync::Arc; use std::time::Duration; @@ -192,14 +192,45 @@ pub trait TimelineWriter: Deref { fn advance_last_record_lsn(&self, lsn: Lsn); } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct WALRecord { - pub will_init: bool, - pub rec: Bytes, +#[derive(Debug, Clone)] +pub struct WALRecord(Box<[u8]>); + +pub const WAL_BIT: u8 = 0b1; +const WILL_INIT_BIT: u8 = 0b10; + +impl WALRecord { + pub fn new(will_init: bool, main_data_offset: u32, rec: &[u8]) -> Self { + // TODO avoid init + let mut buf = vec![0u8; 1 + 4 + rec.len()]; + buf[0] = WAL_BIT | if will_init { WILL_INIT_BIT } else { 0 }; + let mdo = u32::to_le_bytes(main_data_offset); + buf[1..5].copy_from_slice(&mdo); + buf[5..].copy_from_slice(rec); + Self(buf.into_boxed_slice()) + } + + pub fn from_bytes(bytes: Box<[u8]>) -> Self { + Self(bytes) + } + + pub fn will_init(&self) -> bool { + self.0[0] & WILL_INIT_BIT != 0 + } + // Remember the offset of main_data in rec, // so that we don't have to parse the record again. // If record has no main_data, this offset equals rec.len(). - pub main_data_offset: u32, + pub fn main_data_offset(&self) -> u32 { + u32::from_le_bytes(self.0[1..5].try_into().unwrap()) + } + + pub fn rec(&self) -> &[u8] { + &self.0[5..] + } + + pub fn bytes(&self) -> &[u8] { + &self.0[..] + } } #[cfg(test)] diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 8afa2676e2..1beb7bd51b 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -427,11 +427,11 @@ pub fn save_decoded_record( forknum: blk.forknum as u8, }); - let rec = WALRecord { - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + let rec = WALRecord::new( + blk.will_init || blk.apply_image, + decoded.main_data_offset as u32, + &recdata[..], + ); timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; } @@ -770,11 +770,7 @@ fn save_xact_record( let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]); timeline.put_wal_record( lsn, RelishTag::Slru { @@ -886,11 +882,7 @@ fn save_multixact_create_record( xlrec: &XlMultiXactCreate, decoded: &DecodedWALRecord, ) -> Result<()> { - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + let rec = WALRecord::new(false, decoded.main_data_offset as u32, &decoded.record[..]); let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index dc0968250b..b063d616d5 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -317,7 +317,7 @@ impl PostgresRedoManager { } // Apply all collected WAL records for (_lsn, record) in records { - let mut buf = record.rec.clone(); + let mut buf = Bytes::from(record.rec().to_vec()); WAL_REDO_RECORD_COUNTER.inc(); @@ -328,7 +328,7 @@ impl PostgresRedoManager { //move to main data // TODO probably, we should store some records in our special format // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + let skip = (record.main_data_offset() - pg_constants::SIZEOF_XLOGRECORD) as usize; if buf.remaining() > skip { buf.advance(skip); } @@ -574,7 +574,7 @@ impl PostgresRedoProcess { build_push_page_msg(tag, &img, &mut writebuf); } for (lsn, rec) in records.iter() { - build_apply_record_msg(*lsn, &rec.rec, &mut writebuf); + build_apply_record_msg(*lsn, rec.rec(), &mut writebuf); } build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);