Compare commits

...

3 Commits

Author SHA1 Message Date
Heikki Linnakangas
fa057c04ac WIP: Cache latest image of each page
Just an experiment. I'm not sure if this makes sense, given that we
cache materialized page versions in the page cache now. This is different
from that though, in that this stores the image in the ephemeral file, so
it will get spilled to disk. (while the ephemeral file page stays in
memory, we are caching it twice, so at least that needs some work)
2021-11-15 09:25:05 +02:00
Heikki Linnakangas
6114a5d0d5 Use the page cache for the "ephemeral files" that back the open layers. 2021-11-15 09:25:03 +02:00
Heikki Linnakangas
04c4167e72 Replace in-memory layers and OOM-triggered eviction with temp files.
The "in-memory layer" is misnomer now, each in-memory layer is now actually
backed by a file. The files are ephemeral, in that they don't survive page
server crash or shutdown.

This is a bit slow, as we now access the file for every operation. The
next commit adds code to use the page cache for theses files.

This includes changes from 'inmemory-layer-chunks' branch to serialize /
the page versions when they are added to the open layer. The difference is
that they are not serialized to the expandable in-memory "chunk buffer", but
written out to the file.
2021-11-12 21:52:03 +02:00
13 changed files with 753 additions and 294 deletions

View File

@@ -54,6 +54,7 @@ use zenith_utils::seqwait::SeqWait;
mod blob;
mod delta_layer;
mod ephemeral_file;
mod filename;
mod global_layer_map;
mod image_layer;
@@ -68,12 +69,14 @@ use delta_layer::DeltaLayer;
use image_layer::ImageLayer;
use global_layer_map::{LayerId, GLOBAL_LAYER_MAP};
use inmemory_layer::InMemoryLayer;
use inmemory_layer::OpenLayer;
use layer_map::LayerMap;
use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -1025,7 +1028,7 @@ impl LayeredTimeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<OpenLayer>> {
let mut layers = self.layers.lock().unwrap();
assert!(lsn.is_aligned());
@@ -1056,14 +1059,8 @@ impl LayeredTimeline {
lsn
);
layer = InMemoryLayer::create(
self.conf,
self.timelineid,
self.tenantid,
seg,
lsn,
lsn,
)?;
layer =
OpenLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
} else {
return Ok(open_layer);
}
@@ -1099,7 +1096,7 @@ impl LayeredTimeline {
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::create_successor_layer(
layer = OpenLayer::create_successor_layer(
self.conf,
prev_layer,
self.timelineid,
@@ -1116,11 +1113,10 @@ impl LayeredTimeline {
lsn
);
layer =
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
layer = OpenLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
}
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
let layer_rc: Arc<OpenLayer> = Arc::new(layer);
layers.insert_open(Arc::clone(&layer_rc));
Ok(layer_rc)
@@ -1517,8 +1513,9 @@ impl LayeredTimeline {
}
fn lookup_cached_page(&self, seg: &SegmentTag, blknum: u32, lsn: Lsn) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
if let RelishTag::Relation(rel_tag) = &seg.rel {
let (lsn, read_guard) = page_cache::get().lookup_materialized_page(
let (lsn, read_guard) = cache.lookup_materialized_page(
self.tenantid,
self.timelineid,
*rel_tag,
@@ -1571,6 +1568,7 @@ impl LayeredTimeline {
// call it again on the predecessor layer until we have all the required data.
let mut layer_ref = layer;
let mut curr_lsn = lsn;
let mut cacheable_result: Option<Lsn> = None;
loop {
match layer_ref.get_page_reconstruct_data(
blknum,
@@ -1578,7 +1576,15 @@ impl LayeredTimeline {
cached_lsn_opt,
&mut data,
)? {
PageReconstructResult::Complete => break,
PageReconstructResult::Complete => {
if curr_lsn == lsn {
// We have an opportunity to cache this page
if let Some((rec_lsn, _rec)) = data.records.first() {
cacheable_result = Some(*rec_lsn);
}
}
break;
}
PageReconstructResult::Continue(cont_lsn) => {
// Fetch base image / more WAL from the returned predecessor layer
if let Some((cont_layer, cont_lsn)) = self.get_layer_for_read(seg, cont_lsn)? {
@@ -1634,7 +1640,13 @@ impl LayeredTimeline {
}
}
self.reconstruct_page(seg.rel, blknum, lsn, data)
let img = self.reconstruct_page(seg.rel, blknum, lsn, data)?;
if let Some(cache_lsn) = cacheable_result {
layer_ref.cache_page_image(blknum, cache_lsn, &img)?;
}
Ok(img)
}
///
@@ -1698,7 +1710,8 @@ impl LayeredTimeline {
)?;
if let RelishTag::Relation(rel_tag) = &rel {
page_cache::get().memorize_materialized_page(
let cache = page_cache::get();
cache.memorize_materialized_page(
self.tenantid,
self.timelineid,
*rel_tag,
@@ -1774,7 +1787,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_wal_record(lsn, blknum, rec);
let delta_size = layer.put_wal_record(lsn, blknum, rec)?;
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
@@ -1793,7 +1806,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_page_image(blknum, lsn, img);
let delta_size = layer.put_page_image(blknum, lsn, img)?;
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
@@ -1943,32 +1956,3 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
path
))
}
//----- Global layer management
/// Check if too much memory is being used by open layers. If so, evict
pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> {
// Keep evicting layers until we are below the memory threshold.
let mut global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
while let Some((layer_id, layer)) = global_layer_map.find_victim_if_needed(conf.open_mem_limit)
{
drop(global_layer_map);
let tenantid = layer.get_tenant_id();
let timelineid = layer.get_timeline_id();
let _entered =
info_span!("global evict", timeline = %timelineid, tenant = %tenantid).entered();
info!("evicting {}", layer.filename().display());
drop(layer);
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline
.upgrade_to_layered_timeline()
.evict_layer(layer_id)?;
global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
}
Ok(())
}

View File

@@ -1,4 +1,4 @@
use std::io::Write;
use std::io::{Read, Write};
use std::os::unix::prelude::FileExt;
use anyhow::Result;
@@ -29,14 +29,14 @@ impl<W: Write> BlobWriter<W> {
Self { writer, offset: 0 }
}
pub fn write_blob(&mut self, blob: &[u8]) -> Result<BlobRange> {
self.writer.write_all(blob)?;
pub fn write_blob_from_reader(&mut self, r: &mut impl Read) -> Result<BlobRange> {
let len = std::io::copy(r, &mut self.writer)?;
let range = BlobRange {
offset: self.offset,
size: blob.len(),
size: len as usize,
};
self.offset += blob.len() as u64;
self.offset += len as u64;
Ok(range)
}

View File

@@ -39,6 +39,7 @@
//!
use crate::layered_repository::blob::BlobWriter;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::page_versions::PageVersions;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
@@ -373,14 +374,14 @@ impl DeltaLayer {
}
/// Create a new delta file, using the given page versions and relsizes.
/// The page versions are passed by an iterator; the iterator must return
/// page versions in blknum+lsn order.
/// The page versions are passed in a PageVersions struct. If 'cutoff' is
/// given, only page versions with LSN < cutoff are included.
///
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
/// This is used to write the in-memory layer to disk. The page_versions and
/// relsizes are thus passed in the same format as they are in the in-memory
/// layer, as that's expedient.
#[allow(clippy::too_many_arguments)]
pub fn create<'a>(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -388,7 +389,8 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
page_versions: &PageVersions,
cutoff: Option<Lsn>,
relsizes: VecMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
@@ -427,9 +429,10 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
let page_versions_iter = page_versions.ordered_page_version_iter(cutoff);
for (blknum, lsn, pos) in page_versions_iter {
let blob_range =
page_version_writer.write_blob_from_reader(&mut page_versions.reader(pos)?)?;
inner
.page_version_metas

View File

@@ -0,0 +1,295 @@
use crate::page_cache;
use crate::page_cache::PAGE_SZ;
use crate::page_cache::{ReadBufResult, WriteBufResult};
use crate::virtual_file::VirtualFile;
use crate::PageServerConf;
use lazy_static::lazy_static;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
use std::os::unix::fs::FileExt;
lazy_static! {
///
/// This is the global cache of file descriptors (File objects).
///
static ref EPHEMERAL_FILES: RwLock<EphemeralFiles> = RwLock::new(EphemeralFiles {
next_file_id: 1,
files: HashMap::new(),
});
}
pub struct EphemeralFiles {
next_file_id: u64,
files: HashMap<u64, Arc<VirtualFile>>,
}
pub struct EphemeralFile {
file_id: u64,
_tenantid: ZTenantId,
_timelineid: ZTimelineId,
file: Arc<VirtualFile>,
pos: u64,
}
impl EphemeralFile {
pub fn create(
conf: &PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
) -> Result<EphemeralFile, std::io::Error> {
let mut l = EPHEMERAL_FILES.write().unwrap();
let file_id = l.next_file_id;
l.next_file_id += 1;
let filename = conf
.timeline_path(&timelineid, &tenantid)
.join(PathBuf::from(format!("ephemeral-{}", file_id)));
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
let file_rc = Arc::new(file);
l.files.insert(file_id, file_rc.clone());
Ok(EphemeralFile {
file_id,
_tenantid: tenantid,
_timelineid: timelineid,
file: file_rc,
pos: 0,
})
}
pub fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> {
let mut off = 0;
while off < PAGE_SZ {
let n = self
.file
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
if n == 0 {
// Reached EOF. Fill the rest of the buffer with zeros.
const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ];
buf[off..].copy_from_slice(&ZERO_BUF[off..]);
break;
}
off += n as usize;
}
Ok(())
}
}
impl FileExt for EphemeralFile {
fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result<usize, Error> {
// Look up the right page
let blkno = (offset / PAGE_SZ as u64) as u32;
let off = offset as usize % PAGE_SZ;
let len = min(PAGE_SZ - off, dstbuf.len());
let read_guard;
let mut write_guard;
let cache = page_cache::get();
let buf = match cache.read_ephemeral_buf(self.file_id, blkno) {
ReadBufResult::Found(guard) => {
read_guard = guard;
read_guard.as_ref()
}
ReadBufResult::NotFound(guard) => {
// Read the page from disk into the buffer
write_guard = guard;
self.fill_buffer(write_guard.deref_mut(), blkno)?;
write_guard.mark_valid();
// And then fall through to read the requested slice from the
// buffer.
write_guard.as_ref()
}
};
dstbuf[0..len].copy_from_slice(&buf[off..(off + len)]);
Ok(len)
}
fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result<usize, Error> {
// Look up the right page
let blkno = (offset / PAGE_SZ as u64) as u32;
let off = offset as usize % PAGE_SZ;
let len = min(PAGE_SZ - off, srcbuf.len());
let mut write_guard;
let cache = page_cache::get();
let buf = match cache.write_ephemeral_buf(self.file_id, blkno) {
WriteBufResult::Found(guard) => {
write_guard = guard;
write_guard.deref_mut()
}
WriteBufResult::NotFound(guard) => {
// Read the page from disk into the buffer
// TODO: if we're overwriting the whole page, no need to read it in first
write_guard = guard;
self.fill_buffer(write_guard.deref_mut(), blkno)?;
write_guard.mark_valid();
// And then fall through to modify it.
write_guard.deref_mut()
}
};
buf[off..(off + len)].copy_from_slice(&srcbuf[0..len]);
write_guard.mark_dirty();
Ok(len)
}
}
impl Write for EphemeralFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
let n = self.write_at(buf, self.pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
todo!()
}
}
impl Seek for EphemeralFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(_offset) => {
return Err(Error::new(
ErrorKind::Other,
"SeekFrom::End not supported by EphemeralFile",
));
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
}
Ok(self.pos)
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// drop all pages from page cache
let cache = page_cache::get();
cache.drop_buffers_for_ephemeral(self.file_id);
// remove entry from the hash map
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
// unlink file
// FIXME: print error
let _ = std::fs::remove_file(&self.file.path);
}
}
pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Error> {
if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) {
file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64)?;
Ok(())
} else {
Err(std::io::Error::new(
ErrorKind::Other,
"could not write back page, not found in ephemeral files hash",
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fs;
use std::str::FromStr;
fn repo_harness(
test_name: &str,
) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), Error> {
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
let conf = PageServerConf::dummy_conf(repo_dir);
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap();
let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap();
fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?;
Ok((conf, tenantid, timelineid))
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result<String, Error> {
let mut buf = Vec::new();
buf.resize(len, 0u8);
efile.read_exact_at(&mut buf, offset)?;
Ok(String::from_utf8_lossy(&buf)
.trim_end_matches('\0')
.to_string())
}
#[test]
fn test_ephemeral_files() -> Result<(), Error> {
let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?;
let mut file_a = EphemeralFile::create(conf, tenantid, timelineid)?;
file_a.write_all(b"foo")?;
assert_eq!("foo", read_string(&file_a, 0, 20)?);
file_a.write_all(b"bar")?;
assert_eq!("foobar", read_string(&file_a, 0, 20)?);
// Open a lot of files, enough to cause some page evictions.
let mut efiles = Vec::new();
for fileno in 0..100 {
let mut efile = EphemeralFile::create(conf, tenantid, timelineid)?;
efile.write_all(format!("file {}", fileno).as_bytes())?;
assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?);
efiles.push((fileno, efile));
}
// Check that all the files can still be read from. Use them in random order for
// good measure.
efiles.as_mut_slice().shuffle(&mut thread_rng());
for (fileno, efile) in efiles.iter_mut() {
assert_eq!(format!("file {}", fileno), read_string(efile, 0, 10)?);
}
Ok(())
}
}

View File

@@ -10,10 +10,10 @@
//! The ID can be used to relocate the layer later, without having to hold locks.
//!
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, RwLock};
use super::inmemory_layer::InMemoryLayer;
use super::inmemory_layer::OpenLayer;
use lazy_static::lazy_static;
@@ -23,15 +23,6 @@ lazy_static! {
pub static ref GLOBAL_LAYER_MAP: RwLock<OpenLayers> = RwLock::new(OpenLayers::default());
}
///
/// How much memory is being used by all the open layers? This is used to trigger
/// freezing and evicting an open layer to disk.
///
/// This is only a rough approximation, it leaves out a lot of things like malloc()
/// overhead. But as long there is enough "slop" and it's not set too close to the RAM
/// size on the system, it's good enough.
pub static GLOBAL_OPEN_MEM_USAGE: AtomicUsize = AtomicUsize::new(0);
// TODO these types can probably be smaller
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct LayerId {
@@ -40,7 +31,7 @@ pub struct LayerId {
}
enum SlotData {
Occupied(Arc<InMemoryLayer>),
Occupied(Arc<OpenLayer>),
/// Vacant slots form a linked list, the value is the index
/// of the next vacant slot in the list.
Vacant(Option<usize>),
@@ -56,14 +47,13 @@ struct Slot {
pub struct OpenLayers {
slots: Vec<Slot>,
num_occupied: usize,
next_victim: AtomicUsize,
// Head of free-slot list.
next_empty_slot_idx: Option<usize>,
}
impl OpenLayers {
pub fn insert(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
pub fn insert(&mut self, layer: Arc<OpenLayer>) -> LayerId {
let slot_idx = match self.next_empty_slot_idx {
Some(slot_idx) => slot_idx,
None => {
@@ -101,7 +91,7 @@ impl OpenLayers {
}
}
pub fn get(&self, layer_id: &LayerId) -> Option<Arc<InMemoryLayer>> {
pub fn get(&self, layer_id: &LayerId) -> Option<Arc<OpenLayer>> {
let slot = self.slots.get(layer_id.index)?; // TODO should out of bounds indexes just panic?
if slot.tag != layer_id.tag {
return None;
@@ -125,64 +115,6 @@ impl OpenLayers {
}
}
/// Find a victim layer to evict, if the total memory usage of all open layers
/// is larger than 'limit'
pub fn find_victim_if_needed(&self, limit: usize) -> Option<(LayerId, Arc<InMemoryLayer>)> {
let mem_usage = GLOBAL_OPEN_MEM_USAGE.load(Ordering::Relaxed);
if mem_usage > limit {
self.find_victim()
} else {
None
}
}
pub fn find_victim(&self) -> Option<(LayerId, Arc<InMemoryLayer>)> {
if self.num_occupied == 0 {
return None;
}
// Run the clock algorithm.
//
// FIXME: It's theoretically possible that a constant stream of get() requests
// comes in faster than we advance the clock hand, so that this never finishes.
loop {
// FIXME: Because we interpret the clock hand variable modulo slots.len(), the
// hand effectively jumps to a more or less random place whenever the array is
// expanded. That's relatively harmless, it just leads to a non-optimal choice
// of victim. Also, in a server that runs for long enough, the array should reach
// a steady-state size and not grow anymore.
let next_victim = self.next_victim.fetch_add(1, Ordering::Relaxed) % self.slots.len();
let slot = &self.slots[next_victim];
if let SlotData::Occupied(data) = &slot.data {
fn update_fn(old_usage_count: u8) -> Option<u8> {
if old_usage_count > 0 {
Some(old_usage_count - 1)
} else {
None
}
}
if slot
.usage_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, update_fn)
.is_err()
{
// Found a slot with usage_count == 0. Return it.
return Some((
LayerId {
index: next_victim,
tag: slot.tag,
},
Arc::clone(data),
));
}
}
}
}
// TODO this won't be a public API in the future
pub fn remove(&mut self, layer_id: &LayerId) {
let slot = &mut self.slots[layer_id.index];

View File

@@ -1,9 +1,9 @@
//!
//! FIXME
//! An in-memory layer stores recently received page versions in memory. The page versions
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::ephemeral_file::EphemeralFile;
use crate::layered_repository::filename::DeltaFileName;
use crate::layered_repository::global_layer_map::GLOBAL_OPEN_MEM_USAGE;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
@@ -15,17 +15,27 @@ use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{ensure, Result};
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use zenith_utils::vec_map::VecMap;
use zenith_utils::lsn::Lsn;
use zenith_utils::vec_map::VecMap;
use super::page_versions::PageVersions;
pub struct InMemoryLayer {
use zenith_metrics::{register_int_counter, IntCounter};
lazy_static! {
static ref LATEST_IMG_UPDATE_COUNTER: IntCounter =
register_int_counter!("latest_img_updates", "Number of updates of latest img").unwrap();
static ref LATEST_IMG_MISS_COUNTER: IntCounter =
register_int_counter!("latest_img_misses", "Number of cache misses of latest img").unwrap();
static ref LATEST_IMG_HIT_COUNTER: IntCounter =
register_int_counter!("latest_img_hits", "Number of cache hits of latest img").unwrap();
}
pub struct OpenLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
@@ -42,14 +52,14 @@ pub struct InMemoryLayer {
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
inner: RwLock<OpenLayerInner>,
/// Predecessor layer might be needed?
incremental: bool,
}
pub struct InMemoryLayerInner {
/// Frozen in-memory layers have an exclusive end LSN.
pub struct OpenLayerInner {
/// Frozen layers have an exclusive end LSN.
/// Writes are only allowed when this is None
end_lsn: Option<Lsn>,
@@ -71,18 +81,9 @@ pub struct InMemoryLayerInner {
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: VecMap<Lsn, u32>,
/// Approximate amount of memory used by this layer.
///
/// TODO: This is currently a very crude metric, we don't take into account allocator
/// overhead, memory fragmentation, memory used by the VecMaps, nor many other things.
/// Just the actual # of bytes of a page image (8 kB) or the size of a WAL record.
///
/// Whenever this is changed, you must also modify GLOBAL_OPEN_MEM_USAGE accordingly!
mem_usage: usize,
}
impl InMemoryLayerInner {
impl OpenLayerInner {
fn assert_writeable(&self) {
assert!(self.end_lsn.is_none());
}
@@ -100,17 +101,9 @@ impl InMemoryLayerInner {
}
}
impl Drop for InMemoryLayerInner {
fn drop(&mut self) {
if self.mem_usage > 0 {
GLOBAL_OPEN_MEM_USAGE.fetch_sub(self.mem_usage, Ordering::Relaxed);
self.mem_usage = 0;
}
}
}
impl Layer for InMemoryLayer {
// An in-memory layer doesn't really have a filename as it's not stored on disk,
impl Layer for OpenLayer {
// FIXME
// An open layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer
fn filename(&self) -> PathBuf {
let inner = self.inner.read().unwrap();
@@ -179,13 +172,15 @@ impl Layer for InMemoryLayer {
{
let inner = self.inner.read().unwrap();
let latest = inner.page_versions.get_latest(blknum);
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
.page_versions
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (entry_lsn, pv) in iter {
for (entry_lsn, pos) in iter {
match &cached_img_lsn {
Some(cached_lsn) if entry_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Cached)
@@ -193,13 +188,26 @@ impl Layer for InMemoryLayer {
_ => {}
}
let pv = inner.page_versions.get_page_version(*pos)?;
match pv {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some(img.clone());
reconstruct_data.page_img = Some(img);
need_image = false;
break;
}
PageVersion::Wal(rec) => {
if let Some((latest_lsn, latest_pos)) = latest {
if latest_lsn == entry_lsn {
// we had this cached, nice!
let img = inner.page_versions.fetch_cached_latest(*latest_pos)?;
reconstruct_data.page_img = Some(img);
need_image = false;
LATEST_IMG_HIT_COUNTER.inc();
break;
}
}
LATEST_IMG_MISS_COUNTER.inc();
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
@@ -225,6 +233,14 @@ impl Layer for InMemoryLayer {
}
}
fn cache_page_image(&self, blknum: u32, lsn: Lsn, img: &[u8]) -> Result<()> {
let mut inner = self.inner.write().unwrap();
LATEST_IMG_UPDATE_COUNTER.inc();
inner.page_versions.cache_latest(blknum, lsn, img)
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
@@ -293,7 +309,8 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) {
let pv = inner.page_versions.get_page_version(pos)?;
let pv_description = match pv {
PageVersion::Page(_img) => "page",
PageVersion::Wal(_rec) => "wal",
@@ -312,7 +329,7 @@ pub struct LayersOnDisk {
pub image_layers: Vec<ImageLayer>,
}
impl InMemoryLayer {
impl OpenLayer {
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
@@ -328,7 +345,7 @@ impl InMemoryLayer {
seg: SegmentTag,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Result<InMemoryLayer> {
) -> Result<OpenLayer> {
trace!(
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
seg,
@@ -342,7 +359,9 @@ impl InMemoryLayer {
segsizes.append(start_lsn, 0).unwrap();
}
Ok(InMemoryLayer {
let file = EphemeralFile::create(conf, tenantid, timelineid)?;
Ok(OpenLayer {
conf,
timelineid,
tenantid,
@@ -350,12 +369,11 @@ impl InMemoryLayer {
start_lsn,
oldest_pending_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
inner: RwLock::new(OpenLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::default(),
page_versions: PageVersions::new(file),
segsizes,
mem_usage: 0,
}),
})
}
@@ -363,18 +381,18 @@ impl InMemoryLayer {
// Write operations
/// Remember new page version, as a WAL record over previous version
pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> u32 {
pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> Result<u32> {
self.put_page_version(blknum, lsn, PageVersion::Wal(rec))
}
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<u32> {
self.put_page_version(blknum, lsn, PageVersion::Page(img))
}
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> u32 {
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<u32> {
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -388,14 +406,7 @@ impl InMemoryLayer {
inner.assert_writeable();
let mut mem_usage = 0;
mem_usage += match &pv {
PageVersion::Page(img) => img.len(),
PageVersion::Wal(rec) => rec.rec.len(),
};
let (old, delta_size) = inner.page_versions.append_or_update_last(blknum, lsn, pv);
mem_usage += delta_size;
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -405,8 +416,6 @@ impl InMemoryLayer {
);
}
let mut delta_logical_size = 0;
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1;
@@ -439,10 +448,9 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let (old, delta_size) = inner
let old = inner
.page_versions
.append_or_update_last(gapblknum, lsn, zeropv);
mem_usage += delta_size;
.append_or_update_last(gapblknum, lsn, zeropv)?;
// We already had an entry for this LSN. That's odd..
if old.is_some() {
@@ -453,18 +461,12 @@ impl InMemoryLayer {
}
}
let (_old, delta_size) =
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
mem_usage += delta_size;
delta_logical_size = newsize - oldsize;
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
return Ok(newsize - oldsize);
}
}
inner.mem_usage += mem_usage;
GLOBAL_OPEN_MEM_USAGE.fetch_add(mem_usage, Ordering::Relaxed);
delta_logical_size
Ok(0)
}
/// Remember that the relation was truncated at given LSN
@@ -481,9 +483,7 @@ impl InMemoryLayer {
let oldsize = inner.get_seg_size(lsn);
assert!(segsize < oldsize);
let (old, delta_size) = inner.segsizes.append_or_update_last(lsn, segsize).unwrap();
inner.mem_usage += delta_size;
GLOBAL_OPEN_MEM_USAGE.fetch_add(delta_size, Ordering::Relaxed);
let (old, _delta_size) = inner.segsizes.append_or_update_last(lsn, segsize).unwrap();
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -505,7 +505,7 @@ impl InMemoryLayer {
}
///
/// Initialize a new InMemoryLayer for, by copying the state at the given
/// Initialize a new OpenLayer for, by copying the state at the given
/// point in time from given existing layer.
///
pub fn create_successor_layer(
@@ -515,14 +515,14 @@ impl InMemoryLayer {
tenantid: ZTenantId,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Result<InMemoryLayer> {
) -> Result<OpenLayer> {
let seg = src.get_seg_tag();
assert!(oldest_pending_lsn.is_aligned());
assert!(oldest_pending_lsn >= start_lsn);
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
"initializing new OpenLayer for writing {} on timeline {} at {}",
seg,
timelineid,
start_lsn,
@@ -535,7 +535,9 @@ impl InMemoryLayer {
segsizes.append(start_lsn, size).unwrap();
}
Ok(InMemoryLayer {
let file = EphemeralFile::create(conf, tenantid, timelineid)?;
Ok(OpenLayer {
conf,
timelineid,
tenantid,
@@ -543,12 +545,11 @@ impl InMemoryLayer {
start_lsn,
oldest_pending_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
inner: RwLock::new(OpenLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::default(),
page_versions: PageVersions::new(file),
segsizes,
mem_usage: 0,
}),
})
}
@@ -578,16 +579,6 @@ impl InMemoryLayer {
for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) {
assert!(lsn <= end_lsn);
}
// It's a bit premature to subtract the global mem usage here already.
// This layer consumes memory until it's written out to disk and dropped.
// But GLOBAL_OPEN_MEM_USAGE is used to trigger layer eviction, if there are
// too many open layers, and from that point of view this should no longer be
// counted against the global mem usage.
if inner.mem_usage > 0 {
GLOBAL_OPEN_MEM_USAGE.fetch_sub(inner.mem_usage, Ordering::Relaxed);
inner.mem_usage = 0;
}
}
}
@@ -627,7 +618,8 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn_exclusive,
true,
inner.page_versions.ordered_page_version_iter(None),
&inner.page_versions,
None,
inner.segsizes.clone(),
)?;
trace!(
@@ -644,13 +636,9 @@ impl InMemoryLayer {
// Since `end_lsn` is inclusive, subtract 1.
// We want to make an ImageLayer for the last included LSN,
// so the DeltaLayer should exlcude that LSN.
// so the DeltaLayer should exclude that LSN.
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
let mut page_versions = inner
.page_versions
.ordered_page_version_iter(Some(end_lsn_inclusive));
let mut delta_layers = Vec::new();
if self.start_lsn != end_lsn_inclusive {
@@ -664,7 +652,8 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn_inclusive,
false,
page_versions,
&inner.page_versions,
Some(end_lsn_inclusive),
segsizes,
)?;
delta_layers.push(delta_layer);
@@ -675,7 +664,11 @@ impl InMemoryLayer {
end_lsn_inclusive
);
} else {
assert!(page_versions.next().is_none());
assert!(inner
.page_versions
.ordered_page_version_iter(None)
.next()
.is_none());
}
drop(inner);

View File

@@ -11,7 +11,7 @@
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::InMemoryLayer;
use crate::layered_repository::OpenLayer;
use crate::relish::*;
use anyhow::Result;
use lazy_static::lazy_static;
@@ -67,7 +67,7 @@ impl LayerMap {
/// Get the open layer for given segment for writing. Or None if no open
/// layer exists.
///
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<OpenLayer>> {
let segentry = self.segs.get(tag)?;
segentry
@@ -78,7 +78,7 @@ impl LayerMap {
///
/// Insert an open in-memory layer
///
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
pub fn insert_open(&mut self, layer: Arc<OpenLayer>) {
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
let layer_id = segentry.update_open(Arc::clone(&layer));
@@ -214,7 +214,7 @@ impl LayerMap {
}
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<InMemoryLayer>, u64)> {
pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<OpenLayer>, u64)> {
let global_map = GLOBAL_LAYER_MAP.read().unwrap();
while let Some(oldest_entry) = self.open_layers.peek() {
@@ -279,7 +279,7 @@ impl IntervalItem for dyn Layer {
/// Per-segment entry in the LayerMap::segs hash map. Holds all the layers
/// associated with the segment.
///
/// The last layer that is open for writes is always an InMemoryLayer,
/// The last layer that is open for writes is always an OpenLayer,
/// and is kept in a separate field, because there can be only one for
/// each segment. The older layers, stored on disk, are kept in an
/// IntervalTree.
@@ -323,7 +323,7 @@ impl SegEntry {
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
pub fn update_open(&mut self, layer: Arc<OpenLayer>) -> LayerId {
if let Some(prev_open_layer_id) = &self.open_layer_id {
if let Some(prev_open_layer) = GLOBAL_LAYER_MAP.read().unwrap().get(prev_open_layer_id)
{
@@ -414,18 +414,25 @@ mod tests {
forknum: 0,
});
/// Construct a dummy InMemoryLayer for testing
fn dummy_inmem_layer(
lazy_static! {
static ref DUMMY_TIMELINEID: ZTimelineId =
ZTimelineId::from_str("00000000000000000000000000000000").unwrap();
static ref DUMMY_TENANTID: ZTenantId =
ZTenantId::from_str("00000000000000000000000000000000").unwrap();
}
/// Construct a dummy OpenLayer for testing
fn dummy_open_layer(
conf: &'static PageServerConf,
segno: u32,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Arc<InMemoryLayer> {
) -> Arc<OpenLayer> {
Arc::new(
InMemoryLayer::create(
OpenLayer::create(
conf,
ZTimelineId::from_str("00000000000000000000000000000000").unwrap(),
ZTenantId::from_str("00000000000000000000000000000000").unwrap(),
*DUMMY_TIMELINEID,
*DUMMY_TENANTID,
SegmentTag {
rel: TESTREL_A,
segno,
@@ -439,20 +446,21 @@ mod tests {
#[test]
fn test_open_layers() -> Result<()> {
let conf = PageServerConf::dummy_conf(PageServerConf::test_repo_dir("dummy_inmem_layer"));
let conf = PageServerConf::dummy_conf(PageServerConf::test_repo_dir("dummy_open_layer"));
let conf = Box::leak(Box::new(conf));
std::fs::create_dir_all(conf.timeline_path(&DUMMY_TIMELINEID, &DUMMY_TENANTID))?;
let mut layers = LayerMap::default();
let gen1 = layers.increment_generation();
layers.insert_open(dummy_inmem_layer(conf, 0, Lsn(0x100), Lsn(0x100)));
layers.insert_open(dummy_inmem_layer(conf, 1, Lsn(0x100), Lsn(0x200)));
layers.insert_open(dummy_inmem_layer(conf, 2, Lsn(0x100), Lsn(0x120)));
layers.insert_open(dummy_inmem_layer(conf, 3, Lsn(0x100), Lsn(0x110)));
layers.insert_open(dummy_open_layer(conf, 0, Lsn(0x100), Lsn(0x100)));
layers.insert_open(dummy_open_layer(conf, 1, Lsn(0x100), Lsn(0x200)));
layers.insert_open(dummy_open_layer(conf, 2, Lsn(0x100), Lsn(0x120)));
layers.insert_open(dummy_open_layer(conf, 3, Lsn(0x100), Lsn(0x110)));
let gen2 = layers.increment_generation();
layers.insert_open(dummy_inmem_layer(conf, 4, Lsn(0x100), Lsn(0x110)));
layers.insert_open(dummy_inmem_layer(conf, 5, Lsn(0x100), Lsn(0x100)));
layers.insert_open(dummy_open_layer(conf, 4, Lsn(0x100), Lsn(0x110)));
layers.insert_open(dummy_open_layer(conf, 5, Lsn(0x100), Lsn(0x100)));
// A helper function (closure) to pop the next oldest open entry from the layer map,
// and assert that it is what we'd expect

View File

@@ -1,40 +1,121 @@
//!
//! Data structure to ingest incoming WAL into an append-only file.
//!
//! - The file is considered temporary, and will be discarded on crash
//! - based on a B-tree
//!
use std::os::unix::fs::FileExt;
use std::{collections::HashMap, ops::RangeBounds, slice};
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use std::cmp::min;
use std::io::{Seek, SeekFrom};
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
use crate::layered_repository::ephemeral_file::EphemeralFile;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
use postgres_ffi::pg_constants::BLCKSZ;
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
use zenith_utils::bin_ser::LeSer;
const EMPTY_SLICE: &[(Lsn, u64)] = &[];
pub struct PageVersions {
map: HashMap<u32, VecMap<Lsn, u64>>,
latest_map: HashMap<u32, (Lsn, u64)>,
/// The PageVersion structs are stored in a serialized format in this file.
/// Each serialized PageVersion is preceded by a 'u32' length field.
/// The 'map' stores offsets into this file.
file: EphemeralFile,
}
impl PageVersions {
pub fn new(file: EphemeralFile) -> PageVersions {
PageVersions {
map: HashMap::new(),
latest_map: HashMap::new(),
file,
}
}
pub fn cache_latest(&mut self, blknum: u32, lsn: Lsn, img: &[u8]) -> Result<()> {
if img.len() != BLCKSZ as usize {
return Ok(());
}
let pos = if let Some((_lsn, pos)) = self.latest_map.get(&blknum) {
*pos
} else {
let pos = self.file.stream_position()?;
// round up to nearest page boundary for performance
//let pos = (pos + BLCKSZ as u64 - 1) & !(BLCKSZ as u64 - 1);
self.file.seek(SeekFrom::Start(pos + BLCKSZ as u64))?;
pos
};
self.file.write_all_at(img, pos)?;
self.latest_map.insert(blknum, (lsn, pos));
Ok(())
}
pub fn get_latest(&self, blknum: u32) -> Option<&(Lsn, u64)> {
self.latest_map.get(&blknum)
}
pub fn fetch_cached_latest(&self, pos: u64) -> Result<Bytes, std::io::Error> {
let mut buf = BytesMut::with_capacity(BLCKSZ as usize);
buf.resize(BLCKSZ as usize, 0u8);
if let Err(err) = self.file.read_exact_at(buf.as_mut(), pos) {
tracing::error!("read_exact_at {} failed: {:?}", pos, err);
}
Ok(buf.freeze())
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> (Option<PageVersion>, usize) {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
) -> Result<Option<u64>> {
// remember starting position
let pos = self.file.stream_position()?;
// make room for the 'length' field by writing zeros as a placeholder.
self.file.seek(SeekFrom::Start(pos + 4)).unwrap();
page_version.ser_into(&mut self.file).unwrap();
// write the 'length' field.
let len = self.file.stream_position()? - pos - 4;
let lenbuf = u32::to_ne_bytes(len as u32);
self.file.write_all_at(&lenbuf, pos)?;
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
Ok(map.append_or_update_last(lsn, pos as u64).unwrap().0)
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] {
self.map
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] {
self.map
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
@@ -43,7 +124,7 @@ impl PageVersions {
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
@@ -59,6 +140,40 @@ impl PageVersions {
cur_slice_iter: slice.iter(),
}
}
/// Returns a 'Read' that reads the page version at given offset.
pub fn reader(&self, pos: u64) -> Result<PageVersionReader, std::io::Error> {
// read length
let mut lenbuf = [0u8; 4];
self.file.read_exact_at(&mut lenbuf, pos)?;
let len = u32::from_ne_bytes(lenbuf);
Ok(PageVersionReader {
file: &self.file,
pos: pos + 4,
end_pos: pos + 4 + len as u64,
})
}
pub fn get_page_version(&self, pos: u64) -> Result<PageVersion> {
let mut reader = self.reader(pos)?;
Ok(PageVersion::des_from(&mut reader)?)
}
}
pub struct PageVersionReader<'a> {
file: &'a EphemeralFile,
pos: u64,
end_pos: u64,
}
impl<'a> std::io::Read for PageVersionReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len = min(buf.len(), (self.end_pos - self.pos) as usize);
let n = self.file.read_at(&mut buf[..len], self.pos)?;
self.pos += n as u64;
Ok(n)
}
}
pub struct OrderedPageVersionIter<'a> {
@@ -69,7 +184,7 @@ pub struct OrderedPageVersionIter<'a> {
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
cur_slice_iter: slice::Iter<'a, (Lsn, u64)>,
}
impl OrderedPageVersionIter<'_> {
@@ -83,14 +198,14 @@ impl OrderedPageVersionIter<'_> {
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
type Item = (u32, Lsn, u64);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if let Some((lsn, pos)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
return Some((blknum, *lsn, *pos));
}
}
@@ -107,10 +222,34 @@ mod tests {
use bytes::Bytes;
use super::*;
use crate::PageServerConf;
use std::fs;
use std::str::FromStr;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
fn repo_harness(test_name: &str) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId)> {
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
let conf = PageServerConf::dummy_conf(repo_dir);
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap();
let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap();
fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?;
Ok((conf, tenantid, timelineid))
}
#[test]
fn test_ordered_iter() {
let mut page_versions = PageVersions::default();
fn test_ordered_iter() -> Result<()> {
let (conf, tenantid, timelineid) = repo_harness("test_ordered_iter")?;
let file = EphemeralFile::create(conf, tenantid, timelineid)?;
let mut page_versions = PageVersions::new(file);
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
@@ -119,11 +258,11 @@ mod tests {
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let (old, _delta_size) = page_versions.append_or_update_last(
let old = page_versions.append_or_update_last(
blknum,
Lsn(lsn),
empty_page_version.clone(),
);
)?;
assert!(old.is_none());
}
}
@@ -150,5 +289,7 @@ mod tests {
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
Ok(())
}
}

View File

@@ -145,6 +145,10 @@ pub trait Layer: Send + Sync {
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult>;
fn cache_page_image(&self, _blknum: u32, _lsn: Lsn, _img: &[u8]) -> Result<()> {
Ok(())
}
/// Return size of the segment at given LSN. (Only for blocky relations.)
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;

View File

@@ -46,11 +46,13 @@ use std::{
};
use once_cell::sync::OnceCell;
use tracing::error;
use zenith_utils::{
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
};
use crate::layered_repository::writeback_ephemeral_file;
use crate::{relish::RelTag, PageServerConf};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -84,23 +86,25 @@ pub fn get() -> &'static PageCache {
}
}
const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize;
pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
#[derive(PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone)]
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
// Currently, we only store materialized page versions in the page cache.
// To cache another kind of "thing", add enum variant here.
EphemeralPage {
file_id: u64,
blkno: u32,
},
}
#[derive(PartialEq, Eq, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct MaterializedPageHashKey {
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
@@ -122,6 +126,7 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
buf: &'static mut [u8; PAGE_SZ],
dirty: bool,
}
impl Slot {
@@ -170,6 +175,8 @@ pub struct PageCache {
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
ephemeral_page_map: RwLock<HashMap<(u64, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -232,6 +239,9 @@ impl PageWriteGuard<'_> {
);
self.valid = true;
}
pub fn mark_dirty(&mut self) {
self.inner.dirty = true;
}
}
impl Drop for PageWriteGuard<'_> {
@@ -250,20 +260,20 @@ impl Drop for PageWriteGuard<'_> {
}
/// lock_for_read() return value
enum ReadBufResult<'a> {
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
enum WriteBufResult<'a> {
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
impl PageCache {
//
// Section 1: Public interface functions for looking up and memorizing materialized page
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
@@ -291,8 +301,11 @@ impl PageCache {
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key;
Some((lsn, guard))
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
Some((lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
@@ -334,6 +347,37 @@ impl PageCache {
}
}
pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult {
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> WriteBufResult {
let cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_write(&cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_ephemeral(&self, drop_file_id: u64) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
}
_ => {}
}
}
}
}
//
// Section 2: Internal interface functions for lookup/update.
//
@@ -400,7 +444,6 @@ impl PageCache {
/// }
/// ```
///
#[allow(unused)] // this is currently unused
fn lock_for_read(&self, cache_key: &mut CacheKey) -> ReadBufResult {
loop {
// First check if the key already exists in the cache.
@@ -527,6 +570,10 @@ impl PageCache {
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -546,6 +593,10 @@ impl PageCache {
None
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -569,9 +620,14 @@ impl PageCache {
}
}
} else {
panic!()
panic!("could not find old key in mapping")
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
}
}
}
@@ -602,6 +658,16 @@ impl PageCache {
}
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
None
}
}
}
}
}
@@ -624,11 +690,25 @@ impl PageCache {
let mut inner = slot.inner.write().unwrap();
if let Some(old_key) = &inner.key {
// TODO: if we supported storing dirty pages, this is where
// we'd need to write it disk
if inner.dirty {
if let Err(err) = Self::writeback(old_key, inner.buf) {
// Writing the page to disk failed.
//
// FIXME: What to do here, when? We could propagate the error to the
// caller, but victim buffer is generally unrelated to the original
// call. It can even belong to a different tenant. Currently, we
// report the error to the log and continue the clock sweep to find
// a different victim. But if the problem persists, the page cache
// could fill up with dirty pages that we cannot evict, and we will
// loop retrying the writebacks indefinitely.
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
}
}
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.dirty = false;
inner.key = None;
}
return (slot_idx, inner);
@@ -638,6 +718,20 @@ impl PageCache {
}
}
fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> {
match cache_key {
CacheKey::MaterializedPage {
hash_key: _,
lsn: _,
} => {
panic!("unexpected dirty materialize page");
}
CacheKey::EphemeralPage { file_id, blkno } => {
writeback_ephemeral_file(*file_id, *blkno, buf)
}
}
}
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
@@ -652,7 +746,11 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner { key: None, buf }),
inner: RwLock::new(SlotInner {
key: None,
buf,
dirty: false,
}),
usage_count: AtomicU8::new(0),
}
})
@@ -660,6 +758,7 @@ impl PageCache {
Self {
materialized_page_map: Default::default(),
ephemeral_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
}

View File

@@ -49,7 +49,7 @@ pub struct VirtualFile {
/// if a new file is created, we only pass the create flag when it's initially
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
path: PathBuf,
pub path: PathBuf,
open_options: OpenOptions,
}
@@ -357,7 +357,11 @@ impl Seek for VirtualFile {
impl FileExt for VirtualFile {
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
self.with_file(|file| file.read_at(buf, offset))?
let result = self.with_file(|file| file.read_at(buf, offset))?;
if let Err(err) = &result {
tracing::error!("read_at error: {:?}", err);
}
result
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {

View File

@@ -5,7 +5,6 @@
//!
//! We keep one WAL receiver active per timeline.
use crate::layered_repository;
use crate::relish::*;
use crate::restore_local_repo;
use crate::tenant_mgr;
@@ -176,7 +175,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
}
fn walreceiver_main(
conf: &PageServerConf,
_conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
@@ -296,9 +295,6 @@ fn walreceiver_main(
caught_up = true;
}
// Release memory if needed
layered_repository::evict_layer_if_needed(conf)?;
Some(endlsn)
}