Rewrite TOAST to use the same tree as main index

This commit is contained in:
Konstantin Knizhnik
2021-10-29 17:00:09 +03:00
parent 2dd35b1fbe
commit e6f33a5cd0
5 changed files with 222 additions and 236 deletions

2
Cargo.lock generated
View File

@@ -2572,6 +2572,8 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17eda78e0aabbe323628af9b09766827e5b321b5e742cb38a6dd8f09c60a0c1"
dependencies = [
"anyhow",
"crc32c",

View File

@@ -754,55 +754,77 @@ impl Timeline for BufferedTimeline {
.ser()?
.to_vec();
let till = StoreKey::Data(DataKey { rel, blknum, lsn }).ser()?.to_vec();
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from..=&till);
//let mut reconstruct_key: Option<DataKey> = None;
let result = {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from..=&till);
// locate latest version with LSN <= than requested
if let Some(pair) = iter.next_back() {
let ver = PageVersion::des(&pair?.1)?;
match ver {
PageVersion::Image(img) => Ok(img), // already materialized: we are done
PageVersion::Delta(rec) => {
let mut will_init = rec.will_init;
let mut data = PageReconstructData {
records: Vec::new(),
page_img: None,
};
data.records.push((lsn, rec));
// loop until we locate full page image or initialization WAL record
// FIXME-KK: cross-timelines histories are not handled now
while !will_init {
if let Some(entry) = iter.next_back() {
let pair = entry?;
let key = StoreKey::des(&pair.0)?;
let ver = PageVersion::des(&pair.1)?;
if let StoreKey::Data(dk) = key {
assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image
match ver {
PageVersion::Image(img) => {
data.page_img = Some(img);
break;
}
PageVersion::Delta(rec) => {
will_init = rec.will_init;
data.records.push((dk.lsn, rec));
// locate latest version with LSN <= than requested
if let Some(entry) = iter.next_back() {
let pair = entry?;
let key = StoreKey::des(&pair.0)?;
if let StoreKey::Data(dk) = key {
let ver = PageVersion::des(&pair.1)?;
match ver {
PageVersion::Image(img) => Ok(img), // already materialized: we are done
PageVersion::Delta(rec) => {
let mut will_init = rec.will_init;
let mut data = PageReconstructData {
records: Vec::new(),
page_img: None,
};
//reconstruct_key = Some(dk);
data.records.push((dk.lsn, rec));
// loop until we locate full page image or initialization WAL record
// FIXME-KK: cross-timelines histories are not handled now
while !will_init {
if let Some(entry) = iter.next_back() {
let pair = entry?;
let key = StoreKey::des(&pair.0)?;
let ver = PageVersion::des(&pair.1)?;
if let StoreKey::Data(dk) = key {
assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image
match ver {
PageVersion::Image(img) => {
data.page_img = Some(img);
break;
}
PageVersion::Delta(rec) => {
will_init = rec.will_init;
data.records.push((dk.lsn, rec));
}
}
} else {
bail!("Unexpected key type {:?}", key);
}
} else {
bail!("Base image not found for relish {} at {}", rel, lsn);
}
} else {
bail!("Unexpected key type {:?}", key);
}
} else {
bail!("Base image not found for relish {} at {}", rel, lsn);
RECONSTRUCT_TIME.observe_closure_duration(|| {
self.reconstruct_page(rel, blknum, lsn, data)
})
}
}
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_page(rel, blknum, lsn, data))
} else {
bail!("Unexpected key type {:?}", key);
}
} else {
warn!("block {} of relish {} not found at {}", blknum, rel, lsn);
Ok(ZERO_PAGE.clone())
}
};
/*
if let Some(key) = reconstruct_key {
if let Ok(img) = &result {
let mut store = self.store.write().unwrap();
store
.data
.put(&StoreKey::Data(key).ser()?, &PageVersion::Image(img.clone()).ser()?)?;
}
} else {
warn!("block {} of relish {} not found at {}", blknum, rel, lsn);
Ok(ZERO_PAGE.clone())
}
*/
result
}
fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result<Option<u32>> {

View File

@@ -33,7 +33,7 @@ pub mod defaults {
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_millis(1000);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);

View File

@@ -1,23 +1,13 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use lz4_flex;
use serde::{Deserialize, Serialize};
use std::ops::RangeBounds;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use tracing::*;
use std::convert::TryInto;
use yakv::storage::{Key, Storage, StorageIterator, Value};
use zenith_utils::bin_ser::BeSer;
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const MAIN_CACHE_SIZE: usize = 8 * 1024; // 64Mb
const TOAST_CACHE_SIZE: usize = 1024; // 8Mb
const MAIN_COMMIT_THRESHOLD: usize = MAIN_CACHE_SIZE / 2;
const TOAST_COMMIT_THRESHOLD: usize = TOAST_CACHE_SIZE / 2;
const TOAST_VALUE_TAG: u8 = 0;
const PLAIN_VALUE_TAG: u8 = 1;
type ToastId = u32;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2;
///
/// Toast storage consistof two KV databases: one for storing main index
@@ -26,50 +16,90 @@ type ToastId = u32;
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
main: Storage, // primary storage
toast: Storage, // storage for TOAST segments
next_id: ToastId, // counter used to identify new TOAST segments
db: Storage, // key-value database
pub committed: bool, // last transaction was committed (not delayed)
}
///
/// TOAST reference
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct ToastRef {
toast_id: ToastId, // assigned TOAST indetifier
orig_size: u32, // Original (uncompressed) value size
compressed_size: u32, // Compressed object size
}
///
/// Identifier of TOAST segment.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct ToastSegId {
toast_id: ToastId,
segno: u32, // segment number used to extract segments in proper order
}
pub struct ToastIterator<'a> {
store: &'a ToastStore,
iter: StorageIterator<'a>,
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().and_then(|res| {
Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?))))
})
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next() {
if let Ok((key,value)) = elem {
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap());
let key = key[..key_len-4].to_vec();
if n_segments != 0 { // TOAST
assert_eq!(segno, next_segno);
if next_segno == 0 {
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno == n_segments {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem)
}
}
assert_eq!(next_segno, 0);
None
}
}
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.iter.next_back().and_then(|res| {
Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?))))
})
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next_back() {
if let Ok((key,value)) = elem {
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap());
let key = key[..key_len-4].to_vec();
if n_segments != 0 { // TOAST
assert!(segno+1 == next_segno || next_segno == 0);
if next_segno == 0 {
let len = (n_segments-1) as usize * TOAST_SEGMENT_SIZE + value.len();
let mut vec = vec![0u8; len];
vec[len - value.len()..].copy_from_slice(&value);
toast = Some(vec);
} else {
toast.as_mut().unwrap()[segno as usize*TOAST_SEGMENT_SIZE..(segno+1) as usize*TOAST_SEGMENT_SIZE].copy_from_slice(&value);
}
next_segno = segno;
if next_segno == 0 {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem)
}
}
assert_eq!(next_segno, 0);
None
}
}
@@ -79,198 +109,128 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> {
impl ToastStore {
pub fn new(path: &Path) -> Result<ToastStore> {
Ok(ToastStore {
main: Storage::open(
&path.join("main.db"),
Some(&path.join("main.log")),
MAIN_CACHE_SIZE,
db: Storage::open(
&path.join("pageserver.db"),
Some(&path.join("pageserver.log")),
CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
toast: Storage::open(
&path.join("toast.db"),
Some(&path.join("toast.log")),
TOAST_CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
next_id: 0,
committed: false,
})
}
pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> {
let mut main_tx = self.main.start_transaction();
let mut tx = self.db.start_transaction();
let value_len = value.len();
let main_pinned;
let mut key = key.clone();
self.committed = false;
if value_len >= TOAST_SEGMENT_SIZE {
let mut toast_tx = self.toast.start_transaction();
if self.next_id == 0 {
self.next_id = toast_tx
.iter()
.next_back()
.transpose()?
.map_or(0u32, |(key, _value)| {
ToastSegId::des(&key).unwrap().toast_id
});
}
self.next_id += 1;
let toast_id = self.next_id;
let compressed_data = lz4_flex::compress(value);
let compressed_data = lz4_flex::compress_prepend_size(value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u32;
let mut segno = 0u16;
let n_segments = ((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
assert!(n_segments != 0);
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8;2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
toast_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
key[key_len-2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
)?;
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
if offs < compressed_data_len {
toast_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
key[key_len-2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..].to_vec(),
)?;
}
let mut value = ToastRef {
toast_id,
orig_size: value_len as u32,
compressed_size: compressed_data_len as u32,
}
.ser()?;
value.insert(0, TOAST_VALUE_TAG);
main_tx.put(key, &value)?;
main_pinned = main_tx.get_cache_info().pinned;
// If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references
if main_pinned > MAIN_COMMIT_THRESHOLD
|| toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD
{
toast_tx.commit()?;
} else {
toast_tx.delay()?;
}
} else {
let mut vec = Vec::with_capacity(value.len() + 1);
vec.push(PLAIN_VALUE_TAG);
vec.extend_from_slice(&value);
main_tx.put(key, &vec)?;
main_pinned = main_tx.get_cache_info().pinned;
key.extend_from_slice(&[0u8;4]);
tx.put(&key, value)?;
}
if main_pinned > MAIN_COMMIT_THRESHOLD {
main_tx.commit()?;
self.committed = true;
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
} else {
main_tx.delay()?;
self.committed = false;
tx.delay()?;
}
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
let mut main_tx = self.main.start_transaction();
let mut toast_tx = self.toast.start_transaction();
toast_tx.commit()?;
main_tx.commit()?;
let mut tx = self.db.start_transaction();
tx.commit()?;
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<Value>> {
self.main
.get(&key.to_vec())
.transpose()
.and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?))))
.transpose()
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
ToastIterator {
store: self,
iter: self.main.range(range),
}
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8;4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8;4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8;4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8;4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded
};
ToastIterator { iter: self.db.range((from,till)) }
}
pub fn remove(&mut self, key: &Key) -> Result<()> {
let mut main_tx = self.main.start_transaction();
if let Some(value) = main_tx.get(key)? {
main_tx.remove(key)?;
let main_pinned = main_tx.get_cache_info().pinned;
if value[0] == TOAST_VALUE_TAG {
let mut toast_tx = self.toast.start_transaction();
let toast_ref = ToastRef::des(&value[1..])?;
let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1)
/ TOAST_SEGMENT_SIZE) as u32;
for segno in 0..n_segments {
toast_tx.remove(
&ToastSegId {
toast_id: toast_ref.toast_id,
segno,
}
.ser()?,
)?;
}
// If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references
if main_pinned > MAIN_COMMIT_THRESHOLD
|| toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD
{
toast_tx.commit()?;
} else {
toast_tx.delay()?;
}
}
if main_pinned > MAIN_COMMIT_THRESHOLD {
main_tx.commit()?;
self.committed = true;
} else {
main_tx.delay()?;
self.committed = false;
}
let mut tx = self.db.start_transaction();
let mut min_key = key.clone();
let mut max_key = key.clone();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
self.committed = false;
if let Some(entry) = iter.next_back() {
let mut key = entry?.0.clone();
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
if n_segments != 0 { // TOAST
for i in 0..n_segments {
key[key_len-2..].copy_from_slice(&i.to_be_bytes());
tx.remove(&key)?;
}
} else {
tx.remove(&key)?;
}
}
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
} else {
self.committed = false;
tx.delay()?;
}
Ok(())
}
pub fn close(&self) -> Result<()> {
self.toast.close()?; // commit and close TOAST store first to avoid dangling references
self.main.close()?;
Ok(())
}
fn detoast(&self, mut value: Value) -> Result<Value> {
if value[0] == TOAST_VALUE_TAG {
// TOAST chain
let toast_ref = ToastRef::des(&value[1..])?;
let mut toast: Value = Vec::with_capacity(toast_ref.orig_size as usize);
let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1)
/ TOAST_SEGMENT_SIZE) as u32;
let from = ToastSegId {
toast_id: toast_ref.toast_id,
segno: 0,
}
.ser()?;
let till = ToastSegId {
toast_id: toast_ref.toast_id,
segno: n_segments,
}
.ser()?;
for seg in self.toast.range(from..till) {
toast.extend_from_slice(&seg?.1);
}
Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?)
} else {
value.remove(0); // remove toast tag
Ok(value)
}
}
Ok(())
}
}
impl Drop for ToastStore {
fn drop(&mut self) {
info!("Storage closed");
// FIXME-KK: better call close() explicitly
self.close().unwrap();
}
}

View File

@@ -53,6 +53,8 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
const WAL_REDO_WORKERS: usize = 2;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///