WIP: cache vfds

This commit is contained in:
Heikki Linnakangas
2021-10-28 01:03:34 +03:00
parent 28675739de
commit 23713eb44f
8 changed files with 247 additions and 41 deletions

2
Cargo.lock generated
View File

@@ -208,8 +208,6 @@ dependencies = [
[[package]]
name = "bookfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753"
dependencies = [
"aversion",
"byteorder",

View File

@@ -5,7 +5,7 @@ authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
[dependencies]
bookfile = "^0.3"
bookfile = { path = "../../bookfile" }
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"

View File

@@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct BlobRange {
offset: u64,
size: usize,
pub offset: u64,
pub size: usize,
}
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {

View File

@@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::vfd::VirtualFile;
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -145,6 +146,8 @@ pub struct DeltaLayerInner {
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>,
vfile: VirtualFile,
}
impl Layer for DeltaLayer {
@@ -186,9 +189,11 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
// TODO: avoid opening the file for each read
let (_path, book) = self.open_book()?;
let mut inner = self.load()?;
let file = inner.vfile.open()?;
let book = Book::new(file)?;
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
let inner = self.load()?;
// Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0));
@@ -221,6 +226,9 @@ impl Layer for DeltaLayer {
}
// release metadata lock and close the file
let file = book.close();
inner.vfile.cache(file);
}
// If an older page image is needed to reconstruct the page, let the
@@ -365,6 +373,18 @@ impl DeltaLayer {
assert!(!relsizes.is_empty());
}
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&DeltaFileName {
seg: seg,
start_lsn: start_lsn,
end_lsn: end_lsn,
dropped: dropped,
}
);
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -377,6 +397,7 @@ impl DeltaLayer {
loaded: true,
page_version_metas: VecMap::default(),
relsizes,
vfile: VirtualFile::new(&path),
}),
};
let mut inner = delta_layer.inner.lock().unwrap();
@@ -496,11 +517,9 @@ impl DeltaLayer {
debug!("loaded from {}", &path.display());
*inner = DeltaLayerInner {
loaded: true,
page_version_metas,
relsizes,
};
inner.loaded = true;
inner.page_version_metas = page_version_metas;
inner.relsizes = relsizes;
Ok(inner)
}
@@ -512,6 +531,13 @@ impl DeltaLayer {
tenantid: ZTenantId,
filename: &DeltaFileName,
) -> DeltaLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&filename,
);
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -524,6 +550,7 @@ impl DeltaLayer {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
vfile: VirtualFile::new(&path),
}),
}
}
@@ -534,7 +561,7 @@ impl DeltaLayer {
pub fn new_for_path(path: &Path, book: &Book<File>) -> Result<Self> {
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let summary = Summary::des(&chapter)?;
Ok(DeltaLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
timelineid: summary.timelineid,
@@ -547,6 +574,7 @@ impl DeltaLayer {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
vfile: VirtualFile::new(path),
}),
})
}

View File

@@ -29,6 +29,7 @@ use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use crate::vfd::VirtualFile;
use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes;
use log::*;
@@ -36,7 +37,7 @@ use serde::{Deserialize, Serialize};
use std::convert::TryInto;
use std::fs;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
@@ -110,6 +111,8 @@ pub struct ImageLayerInner {
/// Derived from filename and bookfile chapter metadata
image_type: ImageType,
vfile: VirtualFile,
}
impl Layer for ImageLayer {
@@ -147,11 +150,12 @@ impl Layer for ImageLayer {
) -> Result<PageReconstructResult> {
assert!(lsn >= self.lsn);
let inner = self.load()?;
let mut inner = self.load()?;
let base_blknum = blknum % RELISH_SEG_SIZE;
let (_path, book) = self.open_book()?;
let mut file = inner.vfile.open()?;
let mut book = Book::new(&mut file)?;
let buf = match &inner.image_type {
ImageType::Blocky { num_blocks } => {
@@ -162,17 +166,20 @@ impl Layer for ImageLayer {
let mut buf = vec![0u8; BLOCK_SIZE];
let offset = BLOCK_SIZE as u64 * base_blknum as u64;
let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.read_exact_at(&mut buf, offset)?;
let mut chapter = book.exclusive_chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.seek(SeekFrom::Start(offset))?;
chapter.read_exact(&mut buf)?;
buf
}
ImageType::NonBlocky => {
ensure!(base_blknum == 0);
book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
book.exclusive_read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
}
};
inner.vfile.cache(file);
reconstruct_data.page_img = Some(Bytes::from(buf));
Ok(PageReconstructResult::Complete)
}
@@ -266,6 +273,16 @@ impl ImageLayer {
ImageType::NonBlocky
};
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&ImageFileName {
seg: seg,
lsn: lsn,
}
);
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -275,12 +292,12 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: true,
image_type: image_type.clone(),
vfile: VirtualFile::new(&path),
}),
};
let inner = layer.inner.lock().unwrap();
// Write the images into a file
let path = layer.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -374,7 +391,8 @@ impl ImageLayer {
return Ok(inner);
}
let (path, book) = self.open_book()?;
let book = Book::new(inner.vfile.open()?)?;
match &self.path_or_conf {
PathOrConf::Conf(_) => {
@@ -412,12 +430,10 @@ impl ImageLayer {
ImageType::NonBlocky
};
debug!("loaded from {}", &path.display());
debug!("loaded from {}", &self.path().display());
*inner = ImageLayerInner {
loaded: true,
image_type,
};
inner.loaded = true;
inner.image_type = image_type;
Ok(inner)
}
@@ -438,6 +454,14 @@ impl ImageLayer {
tenantid: ZTenantId,
filename: &ImageFileName,
) -> ImageLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
filename,
);
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -447,6 +471,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(&path),
}),
}
}
@@ -467,6 +492,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(path),
}),
})
}

View File

@@ -21,6 +21,7 @@ pub mod tenant_mgr;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
pub mod vfd;
pub mod defaults {
use const_format::formatcp;

160
pageserver/src/vfd.rs Normal file
View File

@@ -0,0 +1,160 @@
use std::fs::File;
use std::io::Seek;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use lazy_static::lazy_static;
const INVALID_TAG: u64 = u64::MAX;
struct OpenFiles {
next: usize,
files: Vec<OpenFile>,
}
lazy_static! {
static ref OPEN_FILES: Mutex<OpenFiles> = Mutex::new(OpenFiles {
next: 0,
files: Vec::new(),
});
}
struct OpenFile {
tag: u64,
file: Option<File>,
}
pub struct VirtualFile {
vfd: usize,
tag: u64,
path: PathBuf,
}
impl VirtualFile {
pub fn new(path: &Path) -> VirtualFile {
VirtualFile {
vfd: 0,
tag: INVALID_TAG,
path: path.to_path_buf(),
}
}
pub fn open(&mut self) -> std::io::Result<File> {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
if let Some(mut file) = l.files[self.vfd].file.take() {
// return cached File
eprintln!("reusing {} from {}/{}", self.path.display(), self.vfd, self.tag);
file.rewind()?;
return Ok(file);
}
}
eprintln!("opening {}", self.path.display());
File::open(&self.path)
}
pub fn cache(&mut self, file: File) {
let mut l = OPEN_FILES.lock().unwrap();
let next = if l.next >= l.files.len() {
if l.files.len() < 100 {
l.files.push(OpenFile {
tag: 0,
file: None
});
l.files.len() - 1
} else {
// wrap around
0
}
} else {
l.next
};
l.next = next + 1;
l.files[next].file.replace(file);
l.files[next].tag += 1;
self.vfd = next;
self.tag = l.files[next].tag;
eprintln!("caching {} at {}/{}", self.path.display(), self.vfd, self.tag);
drop(l);
}
}
impl Drop for VirtualFile {
fn drop(&mut self) {
// Close file if it's still open
if self.tag != INVALID_TAG {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
l.files[self.vfd].file.take();
}
}
}
}
#[cfg(test)]
mod tests {
use crate::PageServerConf;
use super::*;
use std::io::Read;
#[test]
fn test_vfd() -> anyhow::Result<()> {
let mut vfiles = Vec::new();
let test_dir = PageServerConf::test_repo_dir("test_vfd");
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir)?;
for i in 0..2000 {
let path = test_dir.join(format!("vfd_test{}", i));
let content = format!("foobar{}", i);
std::fs::write(&path, &content)?;
let vfile = VirtualFile::new(&path);
vfiles.push((vfile, path, content));
}
for i in 0..vfiles.len() {
let (ref mut vfile, _path, expected_content) = &mut vfiles[i];
let mut s = String::new();
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
s.clear();
let (ref mut vfile, _path, expected_content) = &mut vfiles[0];
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
}
Ok(())
}
}

View File

@@ -252,7 +252,7 @@ impl PostgresRedoManager {
.unwrap();
let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
for _ in 1..4 {
for _ in 1..10 {
processes.push(Mutex::new(None));
}
@@ -291,10 +291,10 @@ impl PostgresRedoManager {
let duration = start.elapsed();
info!(
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}",
trace!(
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
duration.as_micros(),
lsn
);
@@ -602,12 +602,8 @@ impl PostgresRedoProcess {
// version is not needed.)
let mut buf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
}
// Send WAL records.
@@ -615,8 +611,6 @@ impl PostgresRedoProcess {
WAL_REDO_RECORD_COUNTER.inc();
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
@@ -627,7 +621,6 @@ impl PostgresRedoProcess {
// Send GetPage command to get the result back
build_get_page_msg(tag, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
@@ -663,7 +656,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
//debug_assert!(buf.len() == 1 + len);
}
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
@@ -677,7 +670,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
.expect("serialize BufferTag should always succeed");
buf.put(base_img);
debug_assert!(buf.len() == 1 + len);
//debug_assert!(buf.len() - oldlen == 1 + len);
}
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
@@ -688,7 +681,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
buf.put_u64(endlsn.0);
buf.put(rec);
debug_assert!(buf.len() == 1 + len);
//debug_assert!(buf.len() - oldlen == 1 + len);
}
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
@@ -699,5 +692,5 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
//debug_assert!(buf.len() == 1 + len);
}