From 23713eb44f0c510fbf5681ed9c9d5b0ba7fe414d Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 28 Oct 2021 01:03:34 +0300 Subject: [PATCH] WIP: cache vfds --- Cargo.lock | 2 - pageserver/Cargo.toml | 2 +- pageserver/src/layered_repository/blob.rs | 4 +- .../src/layered_repository/delta_layer.rs | 44 ++++- .../src/layered_repository/image_layer.rs | 52 ++++-- pageserver/src/lib.rs | 1 + pageserver/src/vfd.rs | 160 ++++++++++++++++++ pageserver/src/walredo.rs | 23 +-- 8 files changed, 247 insertions(+), 41 deletions(-) create mode 100644 pageserver/src/vfd.rs diff --git a/Cargo.lock b/Cargo.lock index ee83de2118..9ed9a21dab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,8 +208,6 @@ dependencies = [ [[package]] name = "bookfile" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753" dependencies = [ "aversion", "byteorder", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index d696f72285..bdc85d88dc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Stas Kelvich "] edition = "2018" [dependencies] -bookfile = "^0.3" +bookfile = { path = "../../bookfile" } chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs index b7c7c3f460..09403deee3 100644 --- a/pageserver/src/layered_repository/blob.rs +++ b/pageserver/src/layered_repository/blob.rs @@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct BlobRange { - offset: u64, - size: usize, + pub offset: u64, + pub size: usize, } pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result> { diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 736a2694bf..6104155dc5 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, }; +use crate::vfd::VirtualFile; use crate::waldecoder; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; @@ -145,6 +146,8 @@ pub struct DeltaLayerInner { /// `relsizes` tracks the size of the relation at different points in time. relsizes: VecMap, + + vfile: VirtualFile, } impl Layer for DeltaLayer { @@ -186,9 +189,11 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory // TODO: avoid opening the file for each read - let (_path, book) = self.open_book()?; + let mut inner = self.load()?; + let file = inner.vfile.open()?; + let book = Book::new(file)?; + let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; - let inner = self.load()?; // Scan the metadata BTreeMap backwards, starting from the given entry. let minkey = (blknum, Lsn(0)); @@ -221,6 +226,9 @@ impl Layer for DeltaLayer { } // release metadata lock and close the file + + let file = book.close(); + inner.vfile.cache(file); } // If an older page image is needed to reconstruct the page, let the @@ -365,6 +373,18 @@ impl DeltaLayer { assert!(!relsizes.is_empty()); } + let path = Self::path_for( + &PathOrConf::Conf(conf), + timelineid, + tenantid, + &DeltaFileName { + seg: seg, + start_lsn: start_lsn, + end_lsn: end_lsn, + dropped: dropped, + } + ); + let delta_layer = DeltaLayer { path_or_conf: PathOrConf::Conf(conf), timelineid, @@ -377,6 +397,7 @@ impl DeltaLayer { loaded: true, page_version_metas: VecMap::default(), relsizes, + vfile: VirtualFile::new(&path), }), }; let mut inner = delta_layer.inner.lock().unwrap(); @@ -496,11 +517,9 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); - *inner = DeltaLayerInner { - loaded: true, - page_version_metas, - relsizes, - }; + inner.loaded = true; + inner.page_version_metas = page_version_metas; + inner.relsizes = relsizes; Ok(inner) } @@ -512,6 +531,13 @@ impl DeltaLayer { tenantid: ZTenantId, filename: &DeltaFileName, ) -> DeltaLayer { + let path = Self::path_for( + &PathOrConf::Conf(conf), + timelineid, + tenantid, + &filename, + ); + DeltaLayer { path_or_conf: PathOrConf::Conf(conf), timelineid, @@ -524,6 +550,7 @@ impl DeltaLayer { loaded: false, page_version_metas: VecMap::default(), relsizes: VecMap::default(), + vfile: VirtualFile::new(&path), }), } } @@ -534,7 +561,7 @@ impl DeltaLayer { pub fn new_for_path(path: &Path, book: &Book) -> Result { let chapter = book.read_chapter(SUMMARY_CHAPTER)?; let summary = Summary::des(&chapter)?; - + Ok(DeltaLayer { path_or_conf: PathOrConf::Path(path.to_path_buf()), timelineid: summary.timelineid, @@ -547,6 +574,7 @@ impl DeltaLayer { loaded: false, page_version_metas: VecMap::default(), relsizes: VecMap::default(), + vfile: VirtualFile::new(path), }), }) } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 744f793558..9d977a9155 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -29,6 +29,7 @@ use crate::layered_repository::LayeredTimeline; use crate::layered_repository::RELISH_SEG_SIZE; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; +use crate::vfd::VirtualFile; use anyhow::{anyhow, bail, ensure, Result}; use bytes::Bytes; use log::*; @@ -36,7 +37,7 @@ use serde::{Deserialize, Serialize}; use std::convert::TryInto; use std::fs; use std::fs::File; -use std::io::{BufWriter, Write}; +use std::io::{BufWriter, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::{Mutex, MutexGuard}; @@ -110,6 +111,8 @@ pub struct ImageLayerInner { /// Derived from filename and bookfile chapter metadata image_type: ImageType, + + vfile: VirtualFile, } impl Layer for ImageLayer { @@ -147,11 +150,12 @@ impl Layer for ImageLayer { ) -> Result { assert!(lsn >= self.lsn); - let inner = self.load()?; + let mut inner = self.load()?; let base_blknum = blknum % RELISH_SEG_SIZE; - let (_path, book) = self.open_book()?; + let mut file = inner.vfile.open()?; + let mut book = Book::new(&mut file)?; let buf = match &inner.image_type { ImageType::Blocky { num_blocks } => { @@ -162,17 +166,20 @@ impl Layer for ImageLayer { let mut buf = vec![0u8; BLOCK_SIZE]; let offset = BLOCK_SIZE as u64 * base_blknum as u64; - let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?; - chapter.read_exact_at(&mut buf, offset)?; + let mut chapter = book.exclusive_chapter_reader(BLOCKY_IMAGES_CHAPTER)?; + chapter.seek(SeekFrom::Start(offset))?; + chapter.read_exact(&mut buf)?; buf } ImageType::NonBlocky => { ensure!(base_blknum == 0); - book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec() + book.exclusive_read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec() } }; + inner.vfile.cache(file); + reconstruct_data.page_img = Some(Bytes::from(buf)); Ok(PageReconstructResult::Complete) } @@ -266,6 +273,16 @@ impl ImageLayer { ImageType::NonBlocky }; + let path = Self::path_for( + &PathOrConf::Conf(conf), + timelineid, + tenantid, + &ImageFileName { + seg: seg, + lsn: lsn, + } + ); + let layer = ImageLayer { path_or_conf: PathOrConf::Conf(conf), timelineid, @@ -275,12 +292,12 @@ impl ImageLayer { inner: Mutex::new(ImageLayerInner { loaded: true, image_type: image_type.clone(), + vfile: VirtualFile::new(&path), }), }; let inner = layer.inner.lock().unwrap(); // Write the images into a file - let path = layer.path(); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? let file = File::create(&path)?; @@ -374,7 +391,8 @@ impl ImageLayer { return Ok(inner); } - let (path, book) = self.open_book()?; + + let book = Book::new(inner.vfile.open()?)?; match &self.path_or_conf { PathOrConf::Conf(_) => { @@ -412,12 +430,10 @@ impl ImageLayer { ImageType::NonBlocky }; - debug!("loaded from {}", &path.display()); + debug!("loaded from {}", &self.path().display()); - *inner = ImageLayerInner { - loaded: true, - image_type, - }; + inner.loaded = true; + inner.image_type = image_type; Ok(inner) } @@ -438,6 +454,14 @@ impl ImageLayer { tenantid: ZTenantId, filename: &ImageFileName, ) -> ImageLayer { + + let path = Self::path_for( + &PathOrConf::Conf(conf), + timelineid, + tenantid, + filename, + ); + ImageLayer { path_or_conf: PathOrConf::Conf(conf), timelineid, @@ -447,6 +471,7 @@ impl ImageLayer { inner: Mutex::new(ImageLayerInner { loaded: false, image_type: ImageType::Blocky { num_blocks: 0 }, + vfile: VirtualFile::new(&path), }), } } @@ -467,6 +492,7 @@ impl ImageLayer { inner: Mutex::new(ImageLayerInner { loaded: false, image_type: ImageType::Blocky { num_blocks: 0 }, + vfile: VirtualFile::new(path), }), }) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 6bb5d5f7f6..a792242d30 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -21,6 +21,7 @@ pub mod tenant_mgr; pub mod waldecoder; pub mod walreceiver; pub mod walredo; +pub mod vfd; pub mod defaults { use const_format::formatcp; diff --git a/pageserver/src/vfd.rs b/pageserver/src/vfd.rs new file mode 100644 index 0000000000..0c5855a30d --- /dev/null +++ b/pageserver/src/vfd.rs @@ -0,0 +1,160 @@ +use std::fs::File; +use std::io::Seek; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use lazy_static::lazy_static; + +const INVALID_TAG: u64 = u64::MAX; + +struct OpenFiles { + next: usize, + files: Vec, +} + +lazy_static! { + static ref OPEN_FILES: Mutex = Mutex::new(OpenFiles { + next: 0, + files: Vec::new(), + }); +} + +struct OpenFile { + tag: u64, + file: Option, +} + +pub struct VirtualFile { + vfd: usize, + tag: u64, + + path: PathBuf, +} + +impl VirtualFile { + + pub fn new(path: &Path) -> VirtualFile { + VirtualFile { + vfd: 0, + tag: INVALID_TAG, + path: path.to_path_buf(), + } + } + + pub fn open(&mut self) -> std::io::Result { + + let mut l = OPEN_FILES.lock().unwrap(); + + if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag { + + if let Some(mut file) = l.files[self.vfd].file.take() { + // return cached File + eprintln!("reusing {} from {}/{}", self.path.display(), self.vfd, self.tag); + file.rewind()?; + return Ok(file); + } + } + eprintln!("opening {}", self.path.display()); + + File::open(&self.path) + } + + pub fn cache(&mut self, file: File) { + + let mut l = OPEN_FILES.lock().unwrap(); + + let next = if l.next >= l.files.len() { + if l.files.len() < 100 { + l.files.push(OpenFile { + tag: 0, + file: None + }); + l.files.len() - 1 + } else { + // wrap around + 0 + } + } else { + l.next + }; + l.next = next + 1; + + l.files[next].file.replace(file); + l.files[next].tag += 1; + + self.vfd = next; + self.tag = l.files[next].tag; + + eprintln!("caching {} at {}/{}", self.path.display(), self.vfd, self.tag); + + drop(l); + } +} + +impl Drop for VirtualFile { + fn drop(&mut self) { + + // Close file if it's still open + + if self.tag != INVALID_TAG { + let mut l = OPEN_FILES.lock().unwrap(); + + if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag { + l.files[self.vfd].file.take(); + } + } + } +} + + + +#[cfg(test)] +mod tests { + use crate::PageServerConf; + use super::*; + use std::io::Read; + + #[test] + fn test_vfd() -> anyhow::Result<()> { + + let mut vfiles = Vec::new(); + + let test_dir = PageServerConf::test_repo_dir("test_vfd"); + let _ = std::fs::remove_dir_all(&test_dir); + std::fs::create_dir_all(&test_dir)?; + + for i in 0..2000 { + let path = test_dir.join(format!("vfd_test{}", i)); + let content = format!("foobar{}", i); + + std::fs::write(&path, &content)?; + + let vfile = VirtualFile::new(&path); + + vfiles.push((vfile, path, content)); + } + + for i in 0..vfiles.len() { + let (ref mut vfile, _path, expected_content) = &mut vfiles[i]; + let mut s = String::new(); + + let mut file = vfile.open()?; + file.read_to_string(&mut s)?; + + assert!(&s == expected_content); + + vfile.cache(file); + + s.clear(); + let (ref mut vfile, _path, expected_content) = &mut vfiles[0]; + let mut file = vfile.open()?; + file.read_to_string(&mut s)?; + + assert!(&s == expected_content); + + vfile.cache(file); + } + + Ok(()) + } +} diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 915193a5d6..db45623ad4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -252,7 +252,7 @@ impl PostgresRedoManager { .unwrap(); let mut processes: Vec>> = Vec::new(); - for _ in 1..4 { + for _ in 1..10 { processes.push(Mutex::new(None)); } @@ -291,10 +291,10 @@ impl PostgresRedoManager { let duration = start.elapsed(); - info!( - "postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}", + trace!( + "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", nrecords, - duration.as_millis(), + duration.as_micros(), lsn ); @@ -602,12 +602,8 @@ impl PostgresRedoProcess { // version is not needed.) let mut buf: Vec = Vec::new(); build_begin_redo_for_block_msg(tag, &mut buf); - timeout(TIMEOUT, stdin.write_all(&buf)).await??; - buf.clear(); if let Some(img) = base_img { build_push_page_msg(tag, &img, &mut buf); - timeout(TIMEOUT, stdin.write_all(&buf)).await??; - buf.clear(); } // Send WAL records. @@ -615,8 +611,6 @@ impl PostgresRedoProcess { WAL_REDO_RECORD_COUNTER.inc(); build_apply_record_msg(*lsn, &rec.rec, &mut buf); - timeout(TIMEOUT, stdin.write_all(&buf)).await??; - buf.clear(); //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", // r.lsn >> 32, r.lsn & 0xffff_ffff); @@ -627,7 +621,6 @@ impl PostgresRedoProcess { // Send GetPage command to get the result back build_get_page_msg(tag, &mut buf); timeout(TIMEOUT, stdin.write_all(&buf)).await??; - buf.clear(); timeout(TIMEOUT, stdin.flush()).await??; //debug!("sent GetPage for {}", tag.blknum); Ok::<(), Error>(()) @@ -663,7 +656,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - debug_assert!(buf.len() == 1 + len); + //debug_assert!(buf.len() == 1 + len); } fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { @@ -677,7 +670,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { .expect("serialize BufferTag should always succeed"); buf.put(base_img); - debug_assert!(buf.len() == 1 + len); + //debug_assert!(buf.len() - oldlen == 1 + len); } fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { @@ -688,7 +681,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { buf.put_u64(endlsn.0); buf.put(rec); - debug_assert!(buf.len() == 1 + len); + //debug_assert!(buf.len() - oldlen == 1 + len); } fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { @@ -699,5 +692,5 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - debug_assert!(buf.len() == 1 + len); + //debug_assert!(buf.len() == 1 + len); }