diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4d806d39c0..ec1017454f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1568,6 +1568,7 @@ impl LayeredTimeline { // call it again on the predecessor layer until we have all the required data. let mut layer_ref = layer; let mut curr_lsn = lsn; + let mut cacheable_result: Option = None; loop { match layer_ref.get_page_reconstruct_data( blknum, @@ -1575,7 +1576,15 @@ impl LayeredTimeline { cached_lsn_opt, &mut data, )? { - PageReconstructResult::Complete => break, + PageReconstructResult::Complete => { + if curr_lsn == lsn { + // We have an opportunity to cache this page + if let Some((rec_lsn, _rec)) = data.records.first() { + cacheable_result = Some(*rec_lsn); + } + } + break; + } PageReconstructResult::Continue(cont_lsn) => { // Fetch base image / more WAL from the returned predecessor layer if let Some((cont_layer, cont_lsn)) = self.get_layer_for_read(seg, cont_lsn)? { @@ -1631,7 +1640,13 @@ impl LayeredTimeline { } } - self.reconstruct_page(seg.rel, blknum, lsn, data) + let img = self.reconstruct_page(seg.rel, blknum, lsn, data)?; + + if let Some(cache_lsn) = cacheable_result { + layer_ref.cache_page_image(blknum, cache_lsn, &img)?; + } + + Ok(img) } /// diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 4c913b9622..58321c8d52 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,6 +15,7 @@ use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{ensure, Result}; use bytes::Bytes; +use lazy_static::lazy_static; use log::*; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -23,6 +24,17 @@ use zenith_utils::vec_map::VecMap; use super::page_versions::PageVersions; +use zenith_metrics::{register_int_counter, IntCounter}; + +lazy_static! { + static ref LATEST_IMG_UPDATE_COUNTER: IntCounter = + register_int_counter!("latest_img_updates", "Number of updates of latest img").unwrap(); + static ref LATEST_IMG_MISS_COUNTER: IntCounter = + register_int_counter!("latest_img_misses", "Number of cache misses of latest img").unwrap(); + static ref LATEST_IMG_HIT_COUNTER: IntCounter = + register_int_counter!("latest_img_hits", "Number of cache hits of latest img").unwrap(); +} + pub struct OpenLayer { conf: &'static PageServerConf, tenantid: ZTenantId, @@ -160,6 +172,8 @@ impl Layer for OpenLayer { { let inner = self.inner.read().unwrap(); + let latest = inner.page_versions.get_latest(blknum); + // Scan the page versions backwards, starting from `lsn`. let iter = inner .page_versions @@ -182,6 +196,18 @@ impl Layer for OpenLayer { break; } PageVersion::Wal(rec) => { + if let Some((latest_lsn, latest_pos)) = latest { + if latest_lsn == entry_lsn { + // we had this cached, nice! + let img = inner.page_versions.fetch_cached_latest(*latest_pos)?; + reconstruct_data.page_img = Some(img); + need_image = false; + LATEST_IMG_HIT_COUNTER.inc(); + break; + } + } + LATEST_IMG_MISS_COUNTER.inc(); + reconstruct_data.records.push((*entry_lsn, rec.clone())); if rec.will_init { // This WAL record initializes the page, so no need to go further back @@ -207,6 +233,14 @@ impl Layer for OpenLayer { } } + fn cache_page_image(&self, blknum: u32, lsn: Lsn, img: &[u8]) -> Result<()> { + let mut inner = self.inner.write().unwrap(); + + LATEST_IMG_UPDATE_COUNTER.inc(); + + inner.page_versions.cache_latest(blknum, lsn, img) + } + /// Get size of the relation at given LSN fn get_seg_size(&self, lsn: Lsn) -> Result { assert!(lsn >= self.start_lsn); diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index 7fabdad77b..6c6c65c9a2 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -9,15 +9,18 @@ use std::os::unix::fs::FileExt; use std::{collections::HashMap, ops::RangeBounds, slice}; use anyhow::Result; +use bytes::{Bytes, BytesMut}; use std::cmp::min; -use std::io::Seek; +use std::io::{Seek, SeekFrom}; use zenith_utils::{lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; use crate::layered_repository::ephemeral_file::EphemeralFile; +use postgres_ffi::pg_constants::BLCKSZ; + use zenith_utils::bin_ser::LeSer; const EMPTY_SLICE: &[(Lsn, u64)] = &[]; @@ -25,6 +28,8 @@ const EMPTY_SLICE: &[(Lsn, u64)] = &[]; pub struct PageVersions { map: HashMap>, + latest_map: HashMap, + /// The PageVersion structs are stored in a serialized format in this file. /// Each serialized PageVersion is preceded by a 'u32' length field. /// The 'map' stores offsets into this file. @@ -35,10 +40,48 @@ impl PageVersions { pub fn new(file: EphemeralFile) -> PageVersions { PageVersions { map: HashMap::new(), + latest_map: HashMap::new(), file, } } + pub fn cache_latest(&mut self, blknum: u32, lsn: Lsn, img: &[u8]) -> Result<()> { + if img.len() != BLCKSZ as usize { + return Ok(()); + } + + let pos = if let Some((_lsn, pos)) = self.latest_map.get(&blknum) { + *pos + } else { + let pos = self.file.stream_position()?; + // round up to nearest page boundary for performance + //let pos = (pos + BLCKSZ as u64 - 1) & !(BLCKSZ as u64 - 1); + + self.file.seek(SeekFrom::Start(pos + BLCKSZ as u64))?; + + pos + }; + + self.file.write_all_at(img, pos)?; + + self.latest_map.insert(blknum, (lsn, pos)); + + Ok(()) + } + + pub fn get_latest(&self, blknum: u32) -> Option<&(Lsn, u64)> { + self.latest_map.get(&blknum) + } + + pub fn fetch_cached_latest(&self, pos: u64) -> Result { + let mut buf = BytesMut::with_capacity(BLCKSZ as usize); + buf.resize(BLCKSZ as usize, 0u8); + if let Err(err) = self.file.read_exact_at(buf.as_mut(), pos) { + tracing::error!("read_exact_at {} failed: {:?}", pos, err); + } + Ok(buf.freeze()) + } + pub fn append_or_update_last( &mut self, blknum: u32, @@ -49,7 +92,7 @@ impl PageVersions { let pos = self.file.stream_position()?; // make room for the 'length' field by writing zeros as a placeholder. - self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap(); + self.file.seek(SeekFrom::Start(pos + 4)).unwrap(); page_version.ser_into(&mut self.file).unwrap(); diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 5ee4f4024d..d3391a0941 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -145,6 +145,10 @@ pub trait Layer: Send + Sync { reconstruct_data: &mut PageReconstructData, ) -> Result; + fn cache_page_image(&self, _blknum: u32, _lsn: Lsn, _img: &[u8]) -> Result<()> { + Ok(()) + } + /// Return size of the segment at given LSN. (Only for blocky relations.) fn get_seg_size(&self, lsn: Lsn) -> Result; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 24cef26169..c2d1eb83f8 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -357,7 +357,11 @@ impl Seek for VirtualFile { impl FileExt for VirtualFile { fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - self.with_file(|file| file.read_at(buf, offset))? + let result = self.with_file(|file| file.read_at(buf, offset))?; + if let Err(err) = &result { + tracing::error!("read_at error: {:?}", err); + } + result } fn write_at(&self, buf: &[u8], offset: u64) -> Result {