From 5aa969a588f4592d794640589c90cbfcef80aa55 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 11 Nov 2021 01:07:50 +0200 Subject: [PATCH] Replace in-memory layers and OOM-triggered eviction with temp files. The "in-memory layer" is misnomer now, each in-memory layer is now actually backed by a file. The files are ephemeral, in that they don't survive page server crash or shutdown. To avoid reading the file for every operation, "ephemeral files" are cached in a page cache. This includes changes from 'inmemory-layer-chunks' branch to serialize / the page versions when they are added to the open layer. The difference is that they are not serialized to the expandable in-memory "chunk buffer", but written out to the file. --- pageserver/src/layered_repository.rs | 42 +-- pageserver/src/layered_repository/README.md | 8 +- pageserver/src/layered_repository/blob.rs | 10 +- .../src/layered_repository/delta_layer.rs | 23 +- .../src/layered_repository/ephemeral_file.rs | 298 ++++++++++++++++++ .../layered_repository/global_layer_map.rs | 77 +---- .../src/layered_repository/inmemory_layer.rs | 121 +++---- .../src/layered_repository/layer_map.rs | 12 +- .../src/layered_repository/page_versions.rs | 144 +++++++-- pageserver/src/page_cache.rs | 149 +++++++-- pageserver/src/virtual_file.rs | 2 +- pageserver/src/walreceiver.rs | 6 +- 12 files changed, 639 insertions(+), 253 deletions(-) create mode 100644 pageserver/src/layered_repository/ephemeral_file.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 28adc6e8d3..7ef63d2f25 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -54,6 +54,7 @@ use zenith_utils::seqwait::SeqWait; mod blob; mod delta_layer; +mod ephemeral_file; mod filename; mod global_layer_map; mod image_layer; @@ -74,6 +75,8 @@ use storage_layer::{ Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, }; +pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file; + static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -1615,8 +1618,9 @@ impl LayeredTimeline { } fn lookup_cached_page(&self, seg: &SegmentTag, blknum: u32, lsn: Lsn) -> Option<(Lsn, Bytes)> { + let cache = page_cache::get(); if let RelishTag::Relation(rel_tag) = &seg.rel { - let (lsn, read_guard) = page_cache::get().lookup_materialized_page( + let (lsn, read_guard) = cache.lookup_materialized_page( self.tenantid, self.timelineid, *rel_tag, @@ -1803,7 +1807,8 @@ impl LayeredTimeline { )?; if let RelishTag::Relation(rel_tag) = &rel { - page_cache::get().memorize_materialized_page( + let cache = page_cache::get(); + cache.memorize_materialized_page( self.tenantid, self.timelineid, *rel_tag, @@ -1879,7 +1884,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.tl.get_layer_for_write(seg, lsn)?; - let delta_size = layer.put_wal_record(lsn, blknum, rec); + let delta_size = layer.put_wal_record(lsn, blknum, rec)?; self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) @@ -1898,7 +1903,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.tl.get_layer_for_write(seg, lsn)?; - let delta_size = layer.put_page_image(blknum, lsn, img); + let delta_size = layer.put_page_image(blknum, lsn, img)?; self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); @@ -2048,32 +2053,3 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { path )) } - -//----- Global layer management - -/// Check if too much memory is being used by open layers. If so, evict -pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> { - // Keep evicting layers until we are below the memory threshold. - let mut global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); - while let Some((layer_id, layer)) = global_layer_map.find_victim_if_needed(conf.open_mem_limit) - { - drop(global_layer_map); - let tenantid = layer.get_tenant_id(); - let timelineid = layer.get_timeline_id(); - - let _entered = - info_span!("global evict", timeline = %timelineid, tenant = %tenantid).entered(); - info!("evicting {}", layer.filename().display()); - drop(layer); - - let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; - - timeline - .upgrade_to_layered_timeline() - .evict_layer(layer_id)?; - - global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); - } - - Ok(()) -} diff --git a/pageserver/src/layered_repository/README.md b/pageserver/src/layered_repository/README.md index f579553563..20f89ddc70 100644 --- a/pageserver/src/layered_repository/README.md +++ b/pageserver/src/layered_repository/README.md @@ -82,13 +82,15 @@ A layer can be in different states: - Open - a layer where new WAL records can be appended to. - Closed - a layer that is read-only, no new WAL records can be appended to it -- Historical: synonym for closed -- InMemory: A layer that is kept only in memory, and needs to be rebuilt from WAL - on pageserver start +- Historic: synonym for closed +- InMemory: A layer that needs to be rebuilt from WAL on pageserver start. +To avoid OOM errors, InMemory layers can be spilled to disk into ephemeral file. - OnDisk: A layer that is stored on disk. If its end-LSN is older than disk_consistent_lsn, it is known to be fully flushed and fsync'd to local disk. - Frozen layer: an in-memory layer that is Closed. +TODO: Clarify the difference between Closed, Historic and Frozen. + There are two kinds of OnDisk layers: - ImageLayer represents an image or a snapshot of a 10 MB relish segment, at one particular LSN. - DeltaLayer represents a collection of WAL records or page images in a range of LSNs, for one diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs index 592e279f4f..7bf6052983 100644 --- a/pageserver/src/layered_repository/blob.rs +++ b/pageserver/src/layered_repository/blob.rs @@ -1,4 +1,4 @@ -use std::io::Write; +use std::io::{Read, Write}; use std::os::unix::prelude::FileExt; use anyhow::Result; @@ -29,14 +29,14 @@ impl BlobWriter { Self { writer, offset: 0 } } - pub fn write_blob(&mut self, blob: &[u8]) -> Result { - self.writer.write_all(blob)?; + pub fn write_blob_from_reader(&mut self, r: &mut impl Read) -> Result { + let len = std::io::copy(r, &mut self.writer)?; let range = BlobRange { offset: self.offset, - size: blob.len(), + size: len as usize, }; - self.offset += blob.len() as u64; + self.offset += len as u64; Ok(range) } diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index d9d0ed37d3..43db88f74e 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -39,6 +39,7 @@ //! use crate::layered_repository::blob::BlobWriter; use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; +use crate::layered_repository::page_versions::PageVersions; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, }; @@ -377,14 +378,14 @@ impl DeltaLayer { } /// Create a new delta file, using the given page versions and relsizes. - /// The page versions are passed by an iterator; the iterator must return - /// page versions in blknum+lsn order. + /// The page versions are passed in a PageVersions struct. If 'cutoff' is + /// given, only page versions with LSN < cutoff are included. /// - /// This is used to write the in-memory layer to disk. The in-memory layer uses the same - /// data structure with two btreemaps as we do, so passing the btreemaps is currently - /// expedient. + /// This is used to write the in-memory layer to disk. The page_versions and + /// relsizes are thus passed in the same format as they are in the in-memory + /// layer, as that's expedient. #[allow(clippy::too_many_arguments)] - pub fn create<'a>( + pub fn create( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, @@ -392,7 +393,8 @@ impl DeltaLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - page_versions: impl Iterator, + page_versions: &PageVersions, + cutoff: Option, relsizes: VecMap, ) -> Result { if seg.rel.is_blocky() { @@ -431,9 +433,10 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); - for (blknum, lsn, page_version) in page_versions { - let buf = PageVersion::ser(page_version)?; - let blob_range = page_version_writer.write_blob(&buf)?; + let page_versions_iter = page_versions.ordered_page_version_iter(cutoff); + for (blknum, lsn, pos) in page_versions_iter { + let blob_range = + page_version_writer.write_blob_from_reader(&mut page_versions.reader(pos)?)?; inner .page_version_metas diff --git a/pageserver/src/layered_repository/ephemeral_file.rs b/pageserver/src/layered_repository/ephemeral_file.rs new file mode 100644 index 0000000000..9178c4261e --- /dev/null +++ b/pageserver/src/layered_repository/ephemeral_file.rs @@ -0,0 +1,298 @@ +//! Implementation of append-only file data structure +//! used to keep in-memory layers spilled on disk. + +use crate::page_cache; +use crate::page_cache::PAGE_SZ; +use crate::page_cache::{ReadBufResult, WriteBufResult}; +use crate::virtual_file::VirtualFile; +use crate::PageServerConf; +use lazy_static::lazy_static; +use std::cmp::min; +use std::collections::HashMap; +use std::fs::OpenOptions; +use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; +use std::ops::DerefMut; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; + +use std::os::unix::fs::FileExt; + +lazy_static! { + /// + /// This is the global cache of file descriptors (File objects). + /// + static ref EPHEMERAL_FILES: RwLock = RwLock::new(EphemeralFiles { + next_file_id: 1, + files: HashMap::new(), + }); +} + +pub struct EphemeralFiles { + next_file_id: u64, + + files: HashMap>, +} + +pub struct EphemeralFile { + file_id: u64, + _tenantid: ZTenantId, + _timelineid: ZTimelineId, + file: Arc, + + pos: u64, +} + +impl EphemeralFile { + pub fn create( + conf: &PageServerConf, + tenantid: ZTenantId, + timelineid: ZTimelineId, + ) -> Result { + let mut l = EPHEMERAL_FILES.write().unwrap(); + let file_id = l.next_file_id; + l.next_file_id += 1; + + let filename = conf + .timeline_path(&timelineid, &tenantid) + .join(PathBuf::from(format!("ephemeral-{}", file_id))); + + let file = VirtualFile::open_with_options( + &filename, + OpenOptions::new().read(true).write(true).create(true), + )?; + let file_rc = Arc::new(file); + l.files.insert(file_id, file_rc.clone()); + + Ok(EphemeralFile { + file_id, + _tenantid: tenantid, + _timelineid: timelineid, + file: file_rc, + pos: 0, + }) + } + + pub fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> { + let mut off = 0; + while off < PAGE_SZ { + let n = self + .file + .read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?; + + if n == 0 { + // Reached EOF. Fill the rest of the buffer with zeros. + const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ]; + + buf[off..].copy_from_slice(&ZERO_BUF[off..]); + break; + } + + off += n as usize; + } + Ok(()) + } +} + +impl FileExt for EphemeralFile { + fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result { + // Look up the right page + let blkno = (offset / PAGE_SZ as u64) as u32; + let off = offset as usize % PAGE_SZ; + let len = min(PAGE_SZ - off, dstbuf.len()); + + let read_guard; + let mut write_guard; + + let cache = page_cache::get(); + let buf = match cache.read_ephemeral_buf(self.file_id, blkno) { + ReadBufResult::Found(guard) => { + read_guard = guard; + read_guard.as_ref() + } + ReadBufResult::NotFound(guard) => { + // Read the page from disk into the buffer + write_guard = guard; + self.fill_buffer(write_guard.deref_mut(), blkno)?; + write_guard.mark_valid(); + + // And then fall through to read the requested slice from the + // buffer. + write_guard.as_ref() + } + }; + + dstbuf[0..len].copy_from_slice(&buf[off..(off + len)]); + Ok(len) + } + + fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result { + // Look up the right page + let blkno = (offset / PAGE_SZ as u64) as u32; + let off = offset as usize % PAGE_SZ; + let len = min(PAGE_SZ - off, srcbuf.len()); + + let mut write_guard; + let cache = page_cache::get(); + let buf = match cache.write_ephemeral_buf(self.file_id, blkno) { + WriteBufResult::Found(guard) => { + write_guard = guard; + write_guard.deref_mut() + } + WriteBufResult::NotFound(guard) => { + // Read the page from disk into the buffer + // TODO: if we're overwriting the whole page, no need to read it in first + write_guard = guard; + self.fill_buffer(write_guard.deref_mut(), blkno)?; + write_guard.mark_valid(); + + // And then fall through to modify it. + write_guard.deref_mut() + } + }; + + buf[off..(off + len)].copy_from_slice(&srcbuf[0..len]); + write_guard.mark_dirty(); + Ok(len) + } +} + +impl Write for EphemeralFile { + fn write(&mut self, buf: &[u8]) -> Result { + let n = self.write_at(buf, self.pos)?; + self.pos += n as u64; + Ok(n) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + todo!() + } +} + +impl Seek for EphemeralFile { + fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(offset) => { + self.pos = offset; + } + SeekFrom::End(_offset) => { + return Err(Error::new( + ErrorKind::Other, + "SeekFrom::End not supported by EphemeralFile", + )); + } + SeekFrom::Current(offset) => { + let pos = self.pos as i128 + offset as i128; + if pos < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "offset would be negative", + )); + } + if pos > u64::MAX as i128 { + return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); + } + self.pos = pos as u64; + } + } + Ok(self.pos) + } +} + +impl Drop for EphemeralFile { + fn drop(&mut self) { + // drop all pages from page cache + let cache = page_cache::get(); + cache.drop_buffers_for_ephemeral(self.file_id); + + // remove entry from the hash map + EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id); + + // unlink file + // FIXME: print error + let _ = std::fs::remove_file(&self.file.path); + } +} + +pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Error> { + if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { + file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64)?; + Ok(()) + } else { + Err(std::io::Error::new( + ErrorKind::Other, + "could not write back page, not found in ephemeral files hash", + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::seq::SliceRandom; + use rand::thread_rng; + use std::fs; + use std::str::FromStr; + + fn repo_harness( + test_name: &str, + ) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), Error> { + let repo_dir = PageServerConf::test_repo_dir(test_name); + let _ = fs::remove_dir_all(&repo_dir); + let conf = PageServerConf::dummy_conf(repo_dir); + // Make a static copy of the config. This can never be free'd, but that's + // OK in a test. + let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + + let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap(); + let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap(); + fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?; + + Ok((conf, tenantid, timelineid)) + } + + // Helper function to slurp contents of a file, starting at the current position, + // into a string + fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result { + let mut buf = Vec::new(); + buf.resize(len, 0u8); + + efile.read_exact_at(&mut buf, offset)?; + + Ok(String::from_utf8_lossy(&buf) + .trim_end_matches('\0') + .to_string()) + } + + #[test] + fn test_ephemeral_files() -> Result<(), Error> { + let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?; + + let mut file_a = EphemeralFile::create(conf, tenantid, timelineid)?; + + file_a.write_all(b"foo")?; + assert_eq!("foo", read_string(&file_a, 0, 20)?); + + file_a.write_all(b"bar")?; + assert_eq!("foobar", read_string(&file_a, 0, 20)?); + + // Open a lot of files, enough to cause some page evictions. + let mut efiles = Vec::new(); + for fileno in 0..100 { + let mut efile = EphemeralFile::create(conf, tenantid, timelineid)?; + efile.write_all(format!("file {}", fileno).as_bytes())?; + assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?); + efiles.push((fileno, efile)); + } + + // Check that all the files can still be read from. Use them in random order for + // good measure. + efiles.as_mut_slice().shuffle(&mut thread_rng()); + for (fileno, efile) in efiles.iter_mut() { + assert_eq!(format!("file {}", fileno), read_string(efile, 0, 10)?); + } + + Ok(()) + } +} diff --git a/pageserver/src/layered_repository/global_layer_map.rs b/pageserver/src/layered_repository/global_layer_map.rs index 9ad4bd6f44..169a89650a 100644 --- a/pageserver/src/layered_repository/global_layer_map.rs +++ b/pageserver/src/layered_repository/global_layer_map.rs @@ -10,7 +10,7 @@ //! The ID can be used to relocate the layer later, without having to hold locks. //! -use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Arc, RwLock}; use super::inmemory_layer::InMemoryLayer; @@ -20,18 +20,10 @@ use lazy_static::lazy_static; const MAX_USAGE_COUNT: u8 = 5; lazy_static! { - pub static ref GLOBAL_LAYER_MAP: RwLock = RwLock::new(OpenLayers::default()); + pub static ref GLOBAL_LAYER_MAP: RwLock = + RwLock::new(InMemoryLayers::default()); } -/// -/// How much memory is being used by all the open layers? This is used to trigger -/// freezing and evicting an open layer to disk. -/// -/// This is only a rough approximation, it leaves out a lot of things like malloc() -/// overhead. But as long there is enough "slop" and it's not set too close to the RAM -/// size on the system, it's good enough. -pub static GLOBAL_OPEN_MEM_USAGE: AtomicUsize = AtomicUsize::new(0); - // TODO these types can probably be smaller #[derive(PartialEq, Eq, Clone, Copy)] pub struct LayerId { @@ -53,16 +45,15 @@ struct Slot { } #[derive(Default)] -pub struct OpenLayers { +pub struct InMemoryLayers { slots: Vec, num_occupied: usize, - next_victim: AtomicUsize, // Head of free-slot list. next_empty_slot_idx: Option, } -impl OpenLayers { +impl InMemoryLayers { pub fn insert(&mut self, layer: Arc) -> LayerId { let slot_idx = match self.next_empty_slot_idx { Some(slot_idx) => slot_idx, @@ -125,64 +116,6 @@ impl OpenLayers { } } - /// Find a victim layer to evict, if the total memory usage of all open layers - /// is larger than 'limit' - pub fn find_victim_if_needed(&self, limit: usize) -> Option<(LayerId, Arc)> { - let mem_usage = GLOBAL_OPEN_MEM_USAGE.load(Ordering::Relaxed); - - if mem_usage > limit { - self.find_victim() - } else { - None - } - } - - pub fn find_victim(&self) -> Option<(LayerId, Arc)> { - if self.num_occupied == 0 { - return None; - } - - // Run the clock algorithm. - // - // FIXME: It's theoretically possible that a constant stream of get() requests - // comes in faster than we advance the clock hand, so that this never finishes. - loop { - // FIXME: Because we interpret the clock hand variable modulo slots.len(), the - // hand effectively jumps to a more or less random place whenever the array is - // expanded. That's relatively harmless, it just leads to a non-optimal choice - // of victim. Also, in a server that runs for long enough, the array should reach - // a steady-state size and not grow anymore. - let next_victim = self.next_victim.fetch_add(1, Ordering::Relaxed) % self.slots.len(); - - let slot = &self.slots[next_victim]; - - if let SlotData::Occupied(data) = &slot.data { - fn update_fn(old_usage_count: u8) -> Option { - if old_usage_count > 0 { - Some(old_usage_count - 1) - } else { - None - } - } - - if slot - .usage_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, update_fn) - .is_err() - { - // Found a slot with usage_count == 0. Return it. - return Some(( - LayerId { - index: next_victim, - tag: slot.tag, - }, - Arc::clone(data), - )); - } - } - } - } - // TODO this won't be a public API in the future pub fn remove(&mut self, layer_id: &LayerId) { let slot = &mut self.slots[layer_id.index]; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 04ff8a3193..3efe62666f 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -1,9 +1,11 @@ +//! An in-memory layer stores recently received PageVersions. +//! The page versions are held in a BTreeMap. To avoid OOM errors, the map size is limited +//! and layers can be spilled to disk into ephemeral files. //! -//! An in-memory layer stores recently received page versions in memory. The page versions -//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation. +//! And there's another BTreeMap to track the size of the relation. //! +use crate::layered_repository::ephemeral_file::EphemeralFile; use crate::layered_repository::filename::DeltaFileName; -use crate::layered_repository::global_layer_map::GLOBAL_OPEN_MEM_USAGE; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; @@ -17,11 +19,9 @@ use anyhow::{ensure, Result}; use bytes::Bytes; use log::*; use std::path::PathBuf; -use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; -use zenith_utils::vec_map::VecMap; - use zenith_utils::lsn::Lsn; +use zenith_utils::vec_map::VecMap; use super::page_versions::PageVersions; @@ -49,7 +49,7 @@ pub struct InMemoryLayer { } pub struct InMemoryLayerInner { - /// Frozen in-memory layers have an exclusive end LSN. + /// Frozen layers have an exclusive end LSN. /// Writes are only allowed when this is None end_lsn: Option, @@ -71,15 +71,6 @@ pub struct InMemoryLayerInner { /// a non-blocky rel, 'segsizes' is not used and is always empty. /// segsizes: VecMap, - - /// Approximate amount of memory used by this layer. - /// - /// TODO: This is currently a very crude metric, we don't take into account allocator - /// overhead, memory fragmentation, memory used by the VecMaps, nor many other things. - /// Just the actual # of bytes of a page image (8 kB) or the size of a WAL record. - /// - /// Whenever this is changed, you must also modify GLOBAL_OPEN_MEM_USAGE accordingly! - mem_usage: usize, } impl InMemoryLayerInner { @@ -100,18 +91,10 @@ impl InMemoryLayerInner { } } -impl Drop for InMemoryLayerInner { - fn drop(&mut self) { - if self.mem_usage > 0 { - GLOBAL_OPEN_MEM_USAGE.fetch_sub(self.mem_usage, Ordering::Relaxed); - self.mem_usage = 0; - } - } -} - impl Layer for InMemoryLayer { - // An in-memory layer doesn't really have a filename as it's not stored on disk, - // but we construct a filename as if it was a delta layer + // An in-memory layer can be spilled to disk into ephemeral file, + // This function is used only for debugging, so we don't need to be very precise. + // Construct a filename as if it was a delta layer. fn filename(&self) -> PathBuf { let inner = self.inner.read().unwrap(); @@ -185,7 +168,7 @@ impl Layer for InMemoryLayer { .get_block_lsn_range(blknum, ..=lsn) .iter() .rev(); - for (entry_lsn, pv) in iter { + for (entry_lsn, pos) in iter { match &cached_img_lsn { Some(cached_lsn) if entry_lsn <= cached_lsn => { return Ok(PageReconstructResult::Cached) @@ -193,9 +176,10 @@ impl Layer for InMemoryLayer { _ => {} } + let pv = inner.page_versions.get_page_version(*pos)?; match pv { PageVersion::Page(img) => { - reconstruct_data.page_img = Some(img.clone()); + reconstruct_data.page_img = Some(img); need_image = false; break; } @@ -301,7 +285,8 @@ impl Layer for InMemoryLayer { println!("segsizes {}: {}", k, v); } - for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) { + for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) { + let pv = inner.page_versions.get_page_version(pos)?; let pv_description = match pv { PageVersion::Page(_img) => "page", PageVersion::Wal(_rec) => "wal", @@ -350,6 +335,8 @@ impl InMemoryLayer { segsizes.append(start_lsn, 0).unwrap(); } + let file = EphemeralFile::create(conf, tenantid, timelineid)?; + Ok(InMemoryLayer { conf, timelineid, @@ -361,9 +348,8 @@ impl InMemoryLayer { inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, - page_versions: PageVersions::default(), + page_versions: PageVersions::new(file), segsizes, - mem_usage: 0, }), }) } @@ -371,18 +357,18 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> u32 { + pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> Result { self.put_page_version(blknum, lsn, PageVersion::Wal(rec)) } /// Remember new page version, as a full page image - pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 { + pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result { self.put_page_version(blknum, lsn, PageVersion::Page(img)) } /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> u32 { + pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result { assert!(self.seg.blknum_in_seg(blknum)); trace!( @@ -396,14 +382,7 @@ impl InMemoryLayer { inner.assert_writeable(); - let mut mem_usage = 0; - mem_usage += match &pv { - PageVersion::Page(img) => img.len(), - PageVersion::Wal(rec) => rec.rec.len(), - }; - - let (old, delta_size) = inner.page_versions.append_or_update_last(blknum, lsn, pv); - mem_usage += delta_size; + let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?; if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -413,8 +392,6 @@ impl InMemoryLayer { ); } - let mut delta_logical_size = 0; - // Also update the relation size, if this extended the relation. if self.seg.rel.is_blocky() { let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1; @@ -447,10 +424,9 @@ impl InMemoryLayer { gapblknum, blknum ); - let (old, delta_size) = inner + let old = inner .page_versions - .append_or_update_last(gapblknum, lsn, zeropv); - mem_usage += delta_size; + .append_or_update_last(gapblknum, lsn, zeropv)?; // We already had an entry for this LSN. That's odd.. if old.is_some() { @@ -461,18 +437,12 @@ impl InMemoryLayer { } } - let (_old, delta_size) = - inner.segsizes.append_or_update_last(lsn, newsize).unwrap(); - mem_usage += delta_size; - - delta_logical_size = newsize - oldsize; + inner.segsizes.append_or_update_last(lsn, newsize).unwrap(); + return Ok(newsize - oldsize); } } - inner.mem_usage += mem_usage; - GLOBAL_OPEN_MEM_USAGE.fetch_add(mem_usage, Ordering::Relaxed); - - delta_logical_size + Ok(0) } /// Remember that the relation was truncated at given LSN @@ -489,9 +459,7 @@ impl InMemoryLayer { let oldsize = inner.get_seg_size(lsn); assert!(segsize < oldsize); - let (old, delta_size) = inner.segsizes.append_or_update_last(lsn, segsize).unwrap(); - inner.mem_usage += delta_size; - GLOBAL_OPEN_MEM_USAGE.fetch_add(delta_size, Ordering::Relaxed); + let (old, _delta_size) = inner.segsizes.append_or_update_last(lsn, segsize).unwrap(); if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -543,6 +511,8 @@ impl InMemoryLayer { segsizes.append(start_lsn, size).unwrap(); } + let file = EphemeralFile::create(conf, tenantid, timelineid)?; + Ok(InMemoryLayer { conf, timelineid, @@ -554,9 +524,8 @@ impl InMemoryLayer { inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, - page_versions: PageVersions::default(), + page_versions: PageVersions::new(file), segsizes, - mem_usage: 0, }), }) } @@ -586,16 +555,6 @@ impl InMemoryLayer { for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) { assert!(lsn <= end_lsn); } - - // It's a bit premature to subtract the global mem usage here already. - // This layer consumes memory until it's written out to disk and dropped. - // But GLOBAL_OPEN_MEM_USAGE is used to trigger layer eviction, if there are - // too many open layers, and from that point of view this should no longer be - // counted against the global mem usage. - if inner.mem_usage > 0 { - GLOBAL_OPEN_MEM_USAGE.fetch_sub(inner.mem_usage, Ordering::Relaxed); - inner.mem_usage = 0; - } } } @@ -635,7 +594,8 @@ impl InMemoryLayer { self.start_lsn, end_lsn_exclusive, true, - inner.page_versions.ordered_page_version_iter(None), + &inner.page_versions, + None, inner.segsizes.clone(), )?; trace!( @@ -652,13 +612,9 @@ impl InMemoryLayer { // Since `end_lsn` is inclusive, subtract 1. // We want to make an ImageLayer for the last included LSN, - // so the DeltaLayer should exlcude that LSN. + // so the DeltaLayer should exclude that LSN. let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1); - let mut page_versions = inner - .page_versions - .ordered_page_version_iter(Some(end_lsn_inclusive)); - let mut delta_layers = Vec::new(); if self.start_lsn != end_lsn_inclusive { @@ -672,7 +628,8 @@ impl InMemoryLayer { self.start_lsn, end_lsn_inclusive, false, - page_versions, + &inner.page_versions, + Some(end_lsn_inclusive), segsizes, )?; delta_layers.push(delta_layer); @@ -683,7 +640,11 @@ impl InMemoryLayer { end_lsn_inclusive ); } else { - assert!(page_versions.next().is_none()); + assert!(inner + .page_versions + .ordered_page_version_iter(None) + .next() + .is_none()); } drop(inner); diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 45d5a90fe0..929932920d 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -414,6 +414,13 @@ mod tests { forknum: 0, }); + lazy_static! { + static ref DUMMY_TIMELINEID: ZTimelineId = + ZTimelineId::from_str("00000000000000000000000000000000").unwrap(); + static ref DUMMY_TENANTID: ZTenantId = + ZTenantId::from_str("00000000000000000000000000000000").unwrap(); + } + /// Construct a dummy InMemoryLayer for testing fn dummy_inmem_layer( conf: &'static PageServerConf, @@ -424,8 +431,8 @@ mod tests { Arc::new( InMemoryLayer::create( conf, - ZTimelineId::from_str("00000000000000000000000000000000").unwrap(), - ZTenantId::from_str("00000000000000000000000000000000").unwrap(), + *DUMMY_TIMELINEID, + *DUMMY_TENANTID, SegmentTag { rel: TESTREL_A, segno, @@ -441,6 +448,7 @@ mod tests { fn test_open_layers() -> Result<()> { let conf = PageServerConf::dummy_conf(PageServerConf::test_repo_dir("dummy_inmem_layer")); let conf = Box::leak(Box::new(conf)); + std::fs::create_dir_all(conf.timeline_path(&DUMMY_TIMELINEID, &DUMMY_TENANTID))?; let mut layers = LayerMap::default(); diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index 4f27b59981..a75eed5f3a 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -1,40 +1,78 @@ +//! +//! Data structure to ingest incoming WAL into an append-only file. +//! +//! - The file is considered temporary, and will be discarded on crash +//! - based on a B-tree +//! + +use std::os::unix::fs::FileExt; use std::{collections::HashMap, ops::RangeBounds, slice}; +use anyhow::Result; + +use std::cmp::min; +use std::io::Seek; + use zenith_utils::{lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; +use crate::layered_repository::ephemeral_file::EphemeralFile; -const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[]; +use zenith_utils::bin_ser::BeSer; -#[derive(Debug, Default)] -pub struct PageVersions(HashMap>); +const EMPTY_SLICE: &[(Lsn, u64)] = &[]; + +pub struct PageVersions { + map: HashMap>, + + /// The PageVersion structs are stored in a serialized format in this file. + /// Each serialized PageVersion is preceded by a 'u32' length field. + /// The 'map' stores offsets into this file. + file: EphemeralFile, +} impl PageVersions { + pub fn new(file: EphemeralFile) -> PageVersions { + PageVersions { + map: HashMap::new(), + file, + } + } + pub fn append_or_update_last( &mut self, blknum: u32, lsn: Lsn, page_version: PageVersion, - ) -> (Option, usize) { - let map = self.0.entry(blknum).or_insert_with(VecMap::default); - map.append_or_update_last(lsn, page_version).unwrap() + ) -> Result> { + // remember starting position + let pos = self.file.stream_position()?; + + // make room for the 'length' field by writing zeros as a placeholder. + self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap(); + + page_version.ser_into(&mut self.file).unwrap(); + + // write the 'length' field. + let len = self.file.stream_position()? - pos - 4; + let lenbuf = u32::to_ne_bytes(len as u32); + self.file.write_all_at(&lenbuf, pos)?; + + let map = self.map.entry(blknum).or_insert_with(VecMap::default); + Ok(map.append_or_update_last(lsn, pos as u64).unwrap().0) } /// Get all [`PageVersion`]s in a block - pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] { - self.0 + fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] { + self.map .get(&blknum) .map(VecMap::as_slice) .unwrap_or(EMPTY_SLICE) } /// Get a range of [`PageVersions`] in a block - pub fn get_block_lsn_range>( - &self, - blknum: u32, - range: R, - ) -> &[(Lsn, PageVersion)] { - self.0 + pub fn get_block_lsn_range>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] { + self.map .get(&blknum) .map(|vec_map| vec_map.slice_range(range)) .unwrap_or(EMPTY_SLICE) @@ -43,7 +81,7 @@ impl PageVersions { /// Iterate through [`PageVersion`]s in (block, lsn) order. /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> { - let mut ordered_blocks: Vec = self.0.keys().cloned().collect(); + let mut ordered_blocks: Vec = self.map.keys().cloned().collect(); ordered_blocks.sort_unstable(); let slice = ordered_blocks @@ -59,6 +97,40 @@ impl PageVersions { cur_slice_iter: slice.iter(), } } + + /// Returns a 'Read' that reads the page version at given offset. + pub fn reader(&self, pos: u64) -> Result { + // read length + let mut lenbuf = [0u8; 4]; + self.file.read_exact_at(&mut lenbuf, pos)?; + let len = u32::from_ne_bytes(lenbuf); + + Ok(PageVersionReader { + file: &self.file, + pos: pos + 4, + end_pos: pos + 4 + len as u64, + }) + } + + pub fn get_page_version(&self, pos: u64) -> Result { + let mut reader = self.reader(pos)?; + Ok(PageVersion::des_from(&mut reader)?) + } +} + +pub struct PageVersionReader<'a> { + file: &'a EphemeralFile, + pos: u64, + end_pos: u64, +} + +impl<'a> std::io::Read for PageVersionReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> Result { + let len = min(buf.len(), (self.end_pos - self.pos) as usize); + let n = self.file.read_at(&mut buf[..len], self.pos)?; + self.pos += n as u64; + Ok(n) + } } pub struct OrderedPageVersionIter<'a> { @@ -69,7 +141,7 @@ pub struct OrderedPageVersionIter<'a> { cutoff_lsn: Option, - cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>, + cur_slice_iter: slice::Iter<'a, (Lsn, u64)>, } impl OrderedPageVersionIter<'_> { @@ -83,14 +155,14 @@ impl OrderedPageVersionIter<'_> { } impl<'a> Iterator for OrderedPageVersionIter<'a> { - type Item = (u32, Lsn, &'a PageVersion); + type Item = (u32, Lsn, u64); fn next(&mut self) -> Option { loop { - if let Some((lsn, page_version)) = self.cur_slice_iter.next() { + if let Some((lsn, pos)) = self.cur_slice_iter.next() { if self.is_lsn_before_cutoff(lsn) { let blknum = self.ordered_blocks[self.cur_block_idx]; - return Some((blknum, *lsn, page_version)); + return Some((blknum, *lsn, *pos)); } } @@ -107,10 +179,34 @@ mod tests { use bytes::Bytes; use super::*; + use crate::PageServerConf; + use std::fs; + use std::str::FromStr; + use zenith_utils::zid::{ZTenantId, ZTimelineId}; + + fn repo_harness(test_name: &str) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId)> { + let repo_dir = PageServerConf::test_repo_dir(test_name); + let _ = fs::remove_dir_all(&repo_dir); + let conf = PageServerConf::dummy_conf(repo_dir); + // Make a static copy of the config. This can never be free'd, but that's + // OK in a test. + let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + + let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap(); + let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap(); + fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?; + + Ok((conf, tenantid, timelineid)) + } #[test] - fn test_ordered_iter() { - let mut page_versions = PageVersions::default(); + fn test_ordered_iter() -> Result<()> { + let (conf, tenantid, timelineid) = repo_harness("test_ordered_iter")?; + + let file = EphemeralFile::create(conf, tenantid, timelineid)?; + + let mut page_versions = PageVersions::new(file); + const BLOCKS: u32 = 1000; const LSNS: u64 = 50; @@ -119,11 +215,11 @@ mod tests { for blknum in 0..BLOCKS { for lsn in 0..LSNS { - let (old, _delta_size) = page_versions.append_or_update_last( + let old = page_versions.append_or_update_last( blknum, Lsn(lsn), empty_page_version.clone(), - ); + )?; assert!(old.is_none()); } } @@ -150,5 +246,7 @@ mod tests { } assert!(iter.next().is_none()); assert!(iter.next().is_none()); // should be robust against excessive next() calls + + Ok(()) } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8480c6757c..525448b71f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -46,11 +46,13 @@ use std::{ }; use once_cell::sync::OnceCell; +use tracing::error; use zenith_utils::{ lsn::Lsn, zid::{ZTenantId, ZTimelineId}, }; +use crate::layered_repository::writeback_ephemeral_file; use crate::{relish::RelTag, PageServerConf}; static PAGE_CACHE: OnceCell = OnceCell::new(); @@ -84,23 +86,25 @@ pub fn get() -> &'static PageCache { } } -const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize; +pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize; const MAX_USAGE_COUNT: u8 = 5; /// /// CacheKey uniquely identifies a "thing" to cache in the page cache. /// -#[derive(PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] enum CacheKey { MaterializedPage { hash_key: MaterializedPageHashKey, lsn: Lsn, }, - // Currently, we only store materialized page versions in the page cache. - // To cache another kind of "thing", add enum variant here. + EphemeralPage { + file_id: u64, + blkno: u32, + }, } -#[derive(PartialEq, Eq, Hash, Clone)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] struct MaterializedPageHashKey { tenant_id: ZTenantId, timeline_id: ZTimelineId, @@ -122,6 +126,7 @@ struct Slot { struct SlotInner { key: Option, buf: &'static mut [u8; PAGE_SZ], + dirty: bool, } impl Slot { @@ -170,6 +175,8 @@ pub struct PageCache { /// can have a separate mapping map, next to this field. materialized_page_map: RwLock>>, + ephemeral_page_map: RwLock>, + /// The actual buffers with their metadata. slots: Box<[Slot]>, @@ -226,12 +233,21 @@ impl std::ops::Deref for PageWriteGuard<'_> { impl PageWriteGuard<'_> { /// Mark that the buffer contents are now valid. pub fn mark_valid(&mut self) { + assert!(self.inner.key.is_some()); assert!( !self.valid, "mark_valid called on a buffer that was already valid" ); self.valid = true; } + pub fn mark_dirty(&mut self) { + // only ephemeral pages can be dirty ATM. + assert!(matches!( + self.inner.key, + Some(CacheKey::EphemeralPage { .. }) + )); + self.inner.dirty = true; + } } impl Drop for PageWriteGuard<'_> { @@ -241,29 +257,31 @@ impl Drop for PageWriteGuard<'_> { /// initializing it, remove the mapping from the page cache. /// fn drop(&mut self) { + assert!(self.inner.key.is_some()); if !self.valid { let self_key = self.inner.key.as_ref().unwrap(); PAGE_CACHE.get().unwrap().remove_mapping(self_key); self.inner.key = None; + self.inner.dirty = false; } } } /// lock_for_read() return value -enum ReadBufResult<'a> { +pub enum ReadBufResult<'a> { Found(PageReadGuard<'a>), NotFound(PageWriteGuard<'a>), } /// lock_for_write() return value -enum WriteBufResult<'a> { +pub enum WriteBufResult<'a> { Found(PageWriteGuard<'a>), NotFound(PageWriteGuard<'a>), } impl PageCache { // - // Section 1: Public interface functions for looking up and memorizing materialized page + // Section 1.1: Public interface functions for looking up and memorizing materialized page // versions in the page cache // @@ -291,8 +309,11 @@ impl PageCache { }; if let Some(guard) = self.try_lock_for_read(&mut cache_key) { - let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key; - Some((lsn, guard)) + if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key { + Some((lsn, guard)) + } else { + panic!("unexpected key type in slot"); + } } else { None } @@ -334,11 +355,44 @@ impl PageCache { } } + // Section 1.2: Public interface functions for working with Ephemeral pages. + + pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult { + let mut cache_key = CacheKey::EphemeralPage { file_id, blkno }; + + self.lock_for_read(&mut cache_key) + } + + pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> WriteBufResult { + let cache_key = CacheKey::EphemeralPage { file_id, blkno }; + + self.lock_for_write(&cache_key) + } + + /// Immediately drop all buffers belonging to given file, without writeback + pub fn drop_buffers_for_ephemeral(&self, drop_file_id: u64) { + for slot_idx in 0..self.slots.len() { + let slot = &self.slots[slot_idx]; + + let mut inner = slot.inner.write().unwrap(); + if let Some(key) = &inner.key { + match key { + CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => { + // remove mapping for old buffer + self.remove_mapping(key); + inner.key = None; + inner.dirty = false; + } + _ => {} + } + } + } + } + // // Section 2: Internal interface functions for lookup/update. // - // Currently, the page cache only stores materialized page images. In the - // future, to add support for a new kind of "thing" to cache, you will need + // To add support for a new kind of "thing" to cache, you will need // to add public interface routines above, and code to deal with the // "mappings" after this section. But the routines in this section should // not require changes. @@ -400,7 +454,6 @@ impl PageCache { /// } /// ``` /// - #[allow(unused)] // this is currently unused fn lock_for_read(&self, cache_key: &mut CacheKey) -> ReadBufResult { loop { // First check if the key already exists in the cache. @@ -429,6 +482,7 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); + inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return ReadBufResult::NotFound(PageWriteGuard { @@ -459,7 +513,7 @@ impl PageCache { /// Return a write-locked buffer for given block. /// - /// Similar to read_for_read(), but the returned buffer is write-locked and + /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. fn lock_for_write(&self, cache_key: &CacheKey) -> WriteBufResult { loop { @@ -489,6 +543,7 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); + inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return WriteBufResult::NotFound(PageWriteGuard { @@ -527,6 +582,10 @@ impl PageCache { *lsn = version.lsn; Some(version.slot_idx) } + CacheKey::EphemeralPage { file_id, blkno } => { + let map = self.ephemeral_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } } } @@ -546,6 +605,10 @@ impl PageCache { None } } + CacheKey::EphemeralPage { file_id, blkno } => { + let map = self.ephemeral_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } } } @@ -569,9 +632,14 @@ impl PageCache { } } } else { - panic!() + panic!("could not find old key in mapping") } } + CacheKey::EphemeralPage { file_id, blkno } => { + let mut map = self.ephemeral_page_map.write().unwrap(); + map.remove(&(*file_id, *blkno)) + .expect("could not find old key in mapping"); + } } } @@ -602,11 +670,21 @@ impl PageCache { } } } + CacheKey::EphemeralPage { file_id, blkno } => { + let mut map = self.ephemeral_page_map.write().unwrap(); + match map.entry((*file_id, *blkno)) { + Entry::Occupied(entry) => Some(*entry.get()), + Entry::Vacant(entry) => { + entry.insert(slot_idx); + None + } + } + } } } // - // Section 5: Misc internal helpers + // Section 4: Misc internal helpers // /// Find a slot to evict. @@ -624,11 +702,25 @@ impl PageCache { let mut inner = slot.inner.write().unwrap(); if let Some(old_key) = &inner.key { - // TODO: if we supported storing dirty pages, this is where - // we'd need to write it disk + if inner.dirty { + if let Err(err) = Self::writeback(old_key, inner.buf) { + // Writing the page to disk failed. + // + // FIXME: What to do here, when? We could propagate the error to the + // caller, but victim buffer is generally unrelated to the original + // call. It can even belong to a different tenant. Currently, we + // report the error to the log and continue the clock sweep to find + // a different victim. But if the problem persists, the page cache + // could fill up with dirty pages that we cannot evict, and we will + // loop retrying the writebacks indefinitely. + error!("writeback of buffer {:?} failed: {}", old_key, err); + continue; + } + } // remove mapping for old buffer self.remove_mapping(old_key); + inner.dirty = false; inner.key = None; } return (slot_idx, inner); @@ -638,6 +730,20 @@ impl PageCache { } } + fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> { + match cache_key { + CacheKey::MaterializedPage { + hash_key: _, + lsn: _, + } => { + panic!("unexpected dirty materialized page"); + } + CacheKey::EphemeralPage { file_id, blkno } => { + writeback_ephemeral_file(*file_id, *blkno, buf) + } + } + } + /// Initialize a new page cache /// /// This should be called only once at page server startup. @@ -652,7 +758,11 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: RwLock::new(SlotInner { key: None, buf }), + inner: RwLock::new(SlotInner { + key: None, + buf, + dirty: false, + }), usage_count: AtomicU8::new(0), } }) @@ -660,6 +770,7 @@ impl PageCache { Self { materialized_page_map: Default::default(), + ephemeral_page_map: Default::default(), slots, next_evict_slot: AtomicUsize::new(0), } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 9ec72c40cc..24cef26169 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -49,7 +49,7 @@ pub struct VirtualFile { /// if a new file is created, we only pass the create flag when it's initially /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. - path: PathBuf, + pub path: PathBuf, open_options: OpenOptions, } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index d6212a5f54..1f84ed8507 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -5,7 +5,6 @@ //! //! We keep one WAL receiver active per timeline. -use crate::layered_repository; use crate::relish::*; use crate::restore_local_repo; use crate::tenant_mgr; @@ -176,7 +175,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: } fn walreceiver_main( - conf: &PageServerConf, + _conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, tenantid: ZTenantId, @@ -296,9 +295,6 @@ fn walreceiver_main( caught_up = true; } - // Release memory if needed - layered_repository::evict_layer_if_needed(conf)?; - Some(endlsn) }