From e4670a5f1e8e07269a7631461eef3d0890f7e112 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 23 Feb 2022 21:04:39 +0200 Subject: [PATCH] Remove the PageVersions abstraction. Since commit fdd987c3ad, it was only used in InMemoryLayers. Let's just "inline" the code into InMemoryLayer itself. I originally did this as part of a bigger PR (#1267). With that PR, one in-memory layer, and one ephemeral file, would hold page versions belonging to multiple segments. Currently, PageVersions can only hold versions for a single segment, so that would need to be changed. Rather than modify PageVersions to support that, just remove it altogether. --- pageserver/src/layered_repository.rs | 1 - .../src/layered_repository/inmemory_layer.rs | 199 ++++++++----- .../src/layered_repository/page_versions.rs | 268 ------------------ vendor/postgres | 2 +- 4 files changed, 136 insertions(+), 334 deletions(-) delete mode 100644 pageserver/src/layered_repository/page_versions.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index a245cbd35d..c4bc23af4b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -64,7 +64,6 @@ mod inmemory_layer; mod interval_tree; mod layer_map; pub mod metadata; -mod page_versions; mod par_fsync; mod storage_layer; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index b710ca1c07..fe4a06d0a5 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -20,13 +20,15 @@ use crate::{ZTenantId, ZTimelineId}; use anyhow::{ensure, Result}; use bytes::Bytes; use log::*; +use std::collections::HashMap; +use std::io::Seek; +use std::os::unix::fs::FileExt; use std::path::PathBuf; use std::sync::{Arc, RwLock}; +use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use zenith_utils::vec_map::VecMap; -use super::page_versions::PageVersions; - pub struct InMemoryLayer { conf: &'static PageServerConf, tenantid: ZTenantId, @@ -71,11 +73,15 @@ pub struct InMemoryLayerInner { /// The drop LSN is recorded in [`end_lsn`]. dropped: bool, - /// - /// All versions of all pages in the layer are are kept here. - /// Indexed by block number and LSN. - /// - page_versions: PageVersions, + /// The PageVersion structs are stored in a serialized format in this file. + /// Each serialized PageVersion is preceded by a 'u32' length field. + /// 'page_versions' map stores offsets into this file. + file: EphemeralFile, + + /// Metadata about all versions of all pages in the layer is kept + /// here. Indexed by block number and LSN. The value is an offset + /// into the ephemeral file where the page version is stored. + page_versions: HashMap>, /// /// `seg_sizes` tracks the size of the segment at different points in time. @@ -111,6 +117,50 @@ impl InMemoryLayerInner { panic!("could not find seg size in in-memory layer"); } } + + /// + /// Read a page version from the ephemeral file. + /// + fn read_pv(&self, off: u64) -> Result { + let mut buf = Vec::new(); + self.read_pv_bytes(off, &mut buf)?; + Ok(PageVersion::des(&buf)?) + } + + /// + /// Read a page version from the ephemeral file, as raw bytes, at + /// the given offset. The bytes are read into 'buf', which is + /// expanded if necessary. Returns the size of the page version. + /// + fn read_pv_bytes(&self, off: u64, buf: &mut Vec) -> Result { + // read length + let mut lenbuf = [0u8; 4]; + self.file.read_exact_at(&mut lenbuf, off)?; + let len = u32::from_ne_bytes(lenbuf) as usize; + + if buf.len() < len { + buf.resize(len, 0); + } + self.file.read_exact_at(&mut buf[0..len], off + 4)?; + Ok(len) + } + + fn write_pv(&mut self, pv: &PageVersion) -> Result { + // remember starting position + let pos = self.file.stream_position()?; + + // make room for the 'length' field by writing zeros as a placeholder. + self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap(); + + pv.ser_into(&mut self.file).unwrap(); + + // write the 'length' field. + let len = self.file.stream_position()? - pos - 4; + let lenbuf = u32::to_ne_bytes(len as u32); + self.file.write_all_at(&lenbuf, pos)?; + + Ok(pos) + } } impl Layer for InMemoryLayer { @@ -185,33 +235,31 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); // Scan the page versions backwards, starting from `lsn`. - let iter = inner - .page_versions - .get_block_lsn_range(blknum, ..=lsn) - .iter() - .rev(); - for (entry_lsn, pos) in iter { - match &cached_img_lsn { - Some(cached_lsn) if entry_lsn <= cached_lsn => { - return Ok(PageReconstructResult::Cached) + if let Some(vec_map) = inner.page_versions.get(&blknum) { + let slice = vec_map.slice_range(..=lsn); + for (entry_lsn, pos) in slice.iter().rev() { + match &cached_img_lsn { + Some(cached_lsn) if entry_lsn <= cached_lsn => { + return Ok(PageReconstructResult::Cached) + } + _ => {} } - _ => {} - } - let pv = inner.page_versions.read_pv(*pos)?; - match pv { - PageVersion::Page(img) => { - reconstruct_data.page_img = Some(img); - need_image = false; - break; - } - PageVersion::Wal(rec) => { - reconstruct_data.records.push((*entry_lsn, rec.clone())); - if rec.will_init() { - // This WAL record initializes the page, so no need to go further back + let pv = inner.read_pv(*pos)?; + match pv { + PageVersion::Page(img) => { + reconstruct_data.page_img = Some(img); need_image = false; break; } + PageVersion::Wal(rec) => { + reconstruct_data.records.push((*entry_lsn, rec.clone())); + if rec.will_init() { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } + } } } } @@ -317,14 +365,22 @@ impl Layer for InMemoryLayer { println!("seg_sizes {}: {}", k, v); } - for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) { - let pv = inner.page_versions.read_pv(pos)?; - let pv_description = match pv { - PageVersion::Page(_img) => "page", - PageVersion::Wal(_rec) => "wal", - }; + // List the blocks in order + let mut page_versions: Vec<(&SegmentBlk, &VecMap)> = + inner.page_versions.iter().collect(); + page_versions.sort_by_key(|k| k.0); - println!("blk {} at {}: {}\n", blknum, lsn, pv_description); + for (blknum, versions) in page_versions { + for (lsn, off) in versions.as_slice() { + let pv = inner.read_pv(*off); + let pv_description = match pv { + Ok(PageVersion::Page(_img)) => "page", + Ok(PageVersion::Wal(_rec)) => "wal", + Err(_err) => "INVALID", + }; + + println!("blk {} at {}: {}\n", blknum, lsn, pv_description); + } } Ok(()) @@ -385,7 +441,8 @@ impl InMemoryLayer { inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, - page_versions: PageVersions::new(file), + file, + page_versions: HashMap::new(), seg_sizes, latest_lsn: oldest_lsn, }), @@ -427,14 +484,18 @@ impl InMemoryLayer { assert!(lsn >= inner.latest_lsn); inner.latest_lsn = lsn; - let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?; - - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!( - "Page version of rel {} blk {} at {} already exists", - self.seg.rel, blknum, lsn - ); + // Write the page version to the file, and remember its offset in 'page_versions' + { + let off = inner.write_pv(&pv)?; + let vec_map = inner.page_versions.entry(blknum).or_default(); + let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!( + "Page version of rel {} blk {} at {} already exists", + self.seg.rel, blknum, lsn + ); + } } // Also update the relation size, if this extended the relation. @@ -468,16 +529,19 @@ impl InMemoryLayer { gapblknum, blknum ); - let old = inner - .page_versions - .append_or_update_last(gapblknum, lsn, zeropv)?; - // We already had an entry for this LSN. That's odd.. - if old.is_some() { - warn!( - "Page version of seg {} blk {} at {} already exists", - self.seg, blknum, lsn - ); + // Write the page version to the file, and remember its offset in + // 'page_versions' + { + let off = inner.write_pv(&zeropv)?; + let vec_map = inner.page_versions.entry(gapblknum).or_default(); + let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + if old.is_some() { + warn!( + "Page version of seg {} blk {} at {} already exists", + self.seg, gapblknum, lsn + ); + } } } @@ -570,7 +634,8 @@ impl InMemoryLayer { inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, - page_versions: PageVersions::new(file), + file, + page_versions: HashMap::new(), seg_sizes, latest_lsn: oldest_lsn, }), @@ -599,8 +664,10 @@ impl InMemoryLayer { assert!(lsn <= &end_lsn, "{:?} {:?}", lsn, end_lsn); } - for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) { - assert!(lsn <= end_lsn); + for (_blk, vec_map) in inner.page_versions.iter() { + for (lsn, _pos) in vec_map.as_slice() { + assert!(*lsn <= end_lsn); + } } } } @@ -678,15 +745,19 @@ impl InMemoryLayer { self.is_dropped(), )?; - // Write all page versions + // Write all page versions, in block + LSN order let mut buf: Vec = Vec::new(); - let page_versions_iter = inner - .page_versions - .ordered_page_version_iter(Some(delta_end_lsn)); - for (blknum, lsn, pos) in page_versions_iter { - let len = inner.page_versions.read_pv_bytes(pos, &mut buf)?; - delta_layer_writer.put_page_version(blknum, lsn, &buf[..len])?; + let pv_iter = inner.page_versions.iter(); + let mut pages: Vec<(&SegmentBlk, &VecMap)> = pv_iter.collect(); + pages.sort_by_key(|(blknum, _vec_map)| *blknum); + for (blknum, vec_map) in pages { + for (lsn, pos) in vec_map.as_slice() { + if *lsn < delta_end_lsn { + let len = inner.read_pv_bytes(*pos, &mut buf)?; + delta_layer_writer.put_page_version(*blknum, *lsn, &buf[..len])?; + } + } } // Create seg_sizes diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs deleted file mode 100644 index 9a1fa26eb9..0000000000 --- a/pageserver/src/layered_repository/page_versions.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! -//! Data structure to ingest incoming WAL into an append-only file. -//! -//! - The file is considered temporary, and will be discarded on crash -//! - based on a B-tree -//! - -use std::os::unix::fs::FileExt; -use std::{collections::HashMap, ops::RangeBounds, slice}; - -use anyhow::Result; - -use std::cmp::min; -use std::io::Seek; - -use zenith_utils::{lsn::Lsn, vec_map::VecMap}; - -use super::storage_layer::PageVersion; -use crate::layered_repository::ephemeral_file::EphemeralFile; - -use zenith_utils::bin_ser::BeSer; - -const EMPTY_SLICE: &[(Lsn, u64)] = &[]; - -pub struct PageVersions { - map: HashMap>, - - /// The PageVersion structs are stored in a serialized format in this file. - /// Each serialized PageVersion is preceded by a 'u32' length field. - /// The 'map' stores offsets into this file. - file: EphemeralFile, -} - -impl PageVersions { - pub fn new(file: EphemeralFile) -> PageVersions { - PageVersions { - map: HashMap::new(), - file, - } - } - - pub fn append_or_update_last( - &mut self, - blknum: u32, - lsn: Lsn, - page_version: PageVersion, - ) -> Result> { - // remember starting position - let pos = self.file.stream_position()?; - - // make room for the 'length' field by writing zeros as a placeholder. - self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap(); - - page_version.ser_into(&mut self.file).unwrap(); - - // write the 'length' field. - let len = self.file.stream_position()? - pos - 4; - let lenbuf = u32::to_ne_bytes(len as u32); - self.file.write_all_at(&lenbuf, pos)?; - - let map = self.map.entry(blknum).or_insert_with(VecMap::default); - Ok(map.append_or_update_last(lsn, pos as u64).unwrap().0) - } - - /// Get all [`PageVersion`]s in a block - fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] { - self.map - .get(&blknum) - .map(VecMap::as_slice) - .unwrap_or(EMPTY_SLICE) - } - - /// Get a range of [`PageVersions`] in a block - pub fn get_block_lsn_range>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] { - self.map - .get(&blknum) - .map(|vec_map| vec_map.slice_range(range)) - .unwrap_or(EMPTY_SLICE) - } - - /// Iterate through [`PageVersion`]s in (block, lsn) order. - /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` - pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> { - let mut ordered_blocks: Vec = self.map.keys().cloned().collect(); - ordered_blocks.sort_unstable(); - - let slice = ordered_blocks - .first() - .map(|&blknum| self.get_block_slice(blknum)) - .unwrap_or(EMPTY_SLICE); - - OrderedPageVersionIter { - page_versions: self, - ordered_blocks, - cur_block_idx: 0, - cutoff_lsn, - cur_slice_iter: slice.iter(), - } - } - - /// - /// Read a page version. - /// - pub fn read_pv(&self, off: u64) -> Result { - let mut buf = Vec::new(); - self.read_pv_bytes(off, &mut buf)?; - Ok(PageVersion::des(&buf)?) - } - - /// - /// Read a page version, as raw bytes, at the given offset. The bytes - /// are read into 'buf', which is expanded if necessary. Returns the - /// size of the page version. - /// - pub fn read_pv_bytes(&self, off: u64, buf: &mut Vec) -> Result { - // read length - let mut lenbuf = [0u8; 4]; - self.file.read_exact_at(&mut lenbuf, off)?; - let len = u32::from_ne_bytes(lenbuf) as usize; - - // Resize the buffer to fit the data, if needed. - // - // We don't shrink the buffer if it's larger than necessary. That avoids - // repeatedly shrinking and expanding when you reuse the same buffer to - // read multiple page versions. Expanding a Vec requires initializing the - // new bytes, which is a waste of time because we're immediately overwriting - // it, but there's no way to avoid it without resorting to unsafe code. - if buf.len() < len { - buf.resize(len, 0); - } - self.file.read_exact_at(&mut buf[0..len], off + 4)?; - - Ok(len) - } -} - -pub struct PageVersionReader<'a> { - file: &'a EphemeralFile, - pos: u64, - end_pos: u64, -} - -impl<'a> std::io::Read for PageVersionReader<'a> { - fn read(&mut self, buf: &mut [u8]) -> Result { - let len = min(buf.len(), (self.end_pos - self.pos) as usize); - let n = self.file.read_at(&mut buf[..len], self.pos)?; - self.pos += n as u64; - Ok(n) - } -} - -pub struct OrderedPageVersionIter<'a> { - page_versions: &'a PageVersions, - - ordered_blocks: Vec, - cur_block_idx: usize, - - cutoff_lsn: Option, - - cur_slice_iter: slice::Iter<'a, (Lsn, u64)>, -} - -impl OrderedPageVersionIter<'_> { - fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool { - if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() { - lsn < cutoff_lsn - } else { - true - } - } -} - -impl<'a> Iterator for OrderedPageVersionIter<'a> { - type Item = (u32, Lsn, u64); - - fn next(&mut self) -> Option { - loop { - if let Some((lsn, pos)) = self.cur_slice_iter.next() { - if self.is_lsn_before_cutoff(lsn) { - let blknum = self.ordered_blocks[self.cur_block_idx]; - return Some((blknum, *lsn, *pos)); - } - } - - let next_block_idx = self.cur_block_idx + 1; - let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?; - self.cur_block_idx = next_block_idx; - self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter(); - } - } -} - -#[cfg(test)] -mod tests { - use bytes::Bytes; - - use super::*; - use crate::config::PageServerConf; - use std::fs; - use std::str::FromStr; - use zenith_utils::zid::{ZTenantId, ZTimelineId}; - - fn repo_harness(test_name: &str) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId)> { - let repo_dir = PageServerConf::test_repo_dir(test_name); - let _ = fs::remove_dir_all(&repo_dir); - let conf = PageServerConf::dummy_conf(repo_dir); - // Make a static copy of the config. This can never be free'd, but that's - // OK in a test. - let conf: &'static PageServerConf = Box::leak(Box::new(conf)); - - let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap(); - let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap(); - fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?; - - Ok((conf, tenantid, timelineid)) - } - - #[test] - fn test_ordered_iter() -> Result<()> { - let (conf, tenantid, timelineid) = repo_harness("test_ordered_iter")?; - - let file = EphemeralFile::create(conf, tenantid, timelineid)?; - - let mut page_versions = PageVersions::new(file); - - const BLOCKS: u32 = 1000; - const LSNS: u64 = 50; - - let empty_page = Bytes::from_static(&[0u8; 8192]); - let empty_page_version = PageVersion::Page(empty_page); - - for blknum in 0..BLOCKS { - for lsn in 0..LSNS { - let old = page_versions.append_or_update_last( - blknum, - Lsn(lsn), - empty_page_version.clone(), - )?; - assert!(old.is_none()); - } - } - - let mut iter = page_versions.ordered_page_version_iter(None); - for blknum in 0..BLOCKS { - for lsn in 0..LSNS { - let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); - assert_eq!(actual_blknum, blknum); - assert_eq!(Lsn(lsn), actual_lsn); - } - } - assert!(iter.next().is_none()); - assert!(iter.next().is_none()); // should be robust against excessive next() calls - - const CUTOFF_LSN: Lsn = Lsn(30); - let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN)); - for blknum in 0..BLOCKS { - for lsn in 0..CUTOFF_LSN.0 { - let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); - assert_eq!(actual_blknum, blknum); - assert_eq!(Lsn(lsn), actual_lsn); - } - } - assert!(iter.next().is_none()); - assert!(iter.next().is_none()); // should be robust against excessive next() calls - - Ok(()) - } -} diff --git a/vendor/postgres b/vendor/postgres index 1872ba6cef..a3709cc364 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 1872ba6cef7cd73d3b1e7169945b47c572c27f51 +Subproject commit a3709cc3643dd28c30b2b8f603ba3d60a586afb9