Implement garbage collector for buffered repository

This commit is contained in:
Konstantin Knizhnik
2021-10-30 13:10:04 +03:00
parent e6f33a5cd0
commit a3e94e888a
5 changed files with 267 additions and 143 deletions

View File

@@ -298,6 +298,7 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("max_wal_size", "100GB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0");

View File

@@ -814,7 +814,7 @@ impl Timeline for BufferedTimeline {
Ok(ZERO_PAGE.clone())
}
};
/*
/*
if let Some(key) = reconstruct_key {
if let Ok(img) = &result {
let mut store = self.store.write().unwrap();
@@ -823,7 +823,7 @@ impl Timeline for BufferedTimeline {
.put(&StoreKey::Data(key).ser()?, &PageVersion::Image(img.clone()).ser()?)?;
}
}
*/
*/
result
}
@@ -1314,11 +1314,125 @@ impl BufferedTimeline {
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub fn gc_timeline(&self, _retain_lsns: Vec<Lsn>, _cutoff: Lsn) -> Result<GcResult> {
// TODO: not implemented yet for buffred storage
let result: GcResult = Default::default();
pub fn gc_timeline(&self, _retain_lsns: Vec<Lsn>, cutoff: Lsn) -> Result<GcResult> {
let now = Instant::now();
let mut result: GcResult = Default::default();
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
info!("GC starting");
let mut from_rel = RelishTag::Relation(RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: 0,
});
let mut from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn(0),
});
// We can not remove deteriorated version immediately, we need to check first that successor exists
let mut last_key: Option<yakv::storage::Key> = None;
'meta: loop {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from.ser()?..);
while let Some(entry) = iter.next() {
let raw_key = entry?.0;
let key = StoreKey::des(&raw_key)?;
if let StoreKey::Metadata(dk) = key {
if dk.lsn < cutoff {
// we have something deteriorated
if let Some(prev_key) = last_key {
// We are still on the same relish...
if from_rel == dk.rel {
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(&prev_key)?;
last_key = None;
// We should reset iterator and start from the current point
continue 'meta;
}
}
from_rel = dk.rel;
from = key;
// Remember key as candidate for deletion
last_key = Some(raw_key);
} else {
from_rel = dk.rel;
from = StoreKey::Metadata(MetadataKey {
rel: from_rel,
lsn: Lsn::MAX,
});
last_key = None;
}
} else {
break 'meta;
}
}
break;
}
// Array to accumulate keys we can remove
let mut deteriorated: Vec<yakv::storage::Key> = Vec::new();
// currently proceed block number
let mut from_blknum = 0;
'data: loop {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from.ser()?..);
while let Some(entry) = iter.next() {
let pair = entry?;
let raw_key = pair.0;
let key = StoreKey::des(&raw_key)?;
if let StoreKey::Data(dk) = key {
if dk.lsn < cutoff {
// we have something deteriorated
let ver = PageVersion::des(&pair.1)?;
// We are still on the same page...
if from_rel == dk.rel && from_blknum == dk.blknum {
if let PageVersion::Image(_) = ver {
// We have full page image: remove all preceding deteriorated records
drop(store);
let mut store = self.store.write().unwrap();
for key in deteriorated.iter() {
store.data.remove(key)?;
}
deteriorated.clear();
// We should reset iterator and start from the current point
continue 'data;
}
// No full page image, so we can't remove deteriorated stuff
deteriorated.clear();
}
// Remember key as candidate for deletion
deteriorated.push(raw_key);
from_rel = dk.rel;
from_blknum = dk.blknum;
from = key;
} else {
// Jump to next page
from_rel = dk.rel;
from = StoreKey::Data(DataKey {
rel: from_rel,
blknum: from_blknum + 1,
lsn: Lsn(0),
});
deteriorated.clear();
}
} else {
break 'data;
}
}
break;
}
result.elapsed = now.elapsed();
Ok(result)
}
///
/// Reconstruct a page version, using the given base image and WAL records in 'data'.
///

View File

@@ -35,8 +35,8 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_GC_HORIZON: u64 = 1 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;

View File

@@ -1,8 +1,8 @@
use anyhow::{anyhow, Result};
use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use std::convert::TryInto;
use yakv::storage::{Key, Storage, StorageIterator, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
@@ -27,79 +27,85 @@ pub struct ToastIterator<'a> {
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next() {
if let Ok((key,value)) = elem {
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap());
let key = key[..key_len-4].to_vec();
if n_segments != 0 { // TOAST
assert_eq!(segno, next_segno);
if next_segno == 0 {
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno == n_segments {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem)
}
if let Ok((key, value)) = elem {
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert_eq!(segno, next_segno);
if next_segno == 0 {
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno == n_segments {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem);
}
}
assert_eq!(next_segno, 0);
None
assert_eq!(next_segno, 0);
None
}
}
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next_back() {
if let Ok((key,value)) = elem {
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap());
let key = key[..key_len-4].to_vec();
if n_segments != 0 { // TOAST
assert!(segno+1 == next_segno || next_segno == 0);
if next_segno == 0 {
let len = (n_segments-1) as usize * TOAST_SEGMENT_SIZE + value.len();
let mut vec = vec![0u8; len];
vec[len - value.len()..].copy_from_slice(&value);
toast = Some(vec);
} else {
toast.as_mut().unwrap()[segno as usize*TOAST_SEGMENT_SIZE..(segno+1) as usize*TOAST_SEGMENT_SIZE].copy_from_slice(&value);
}
next_segno = segno;
if next_segno == 0 {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem)
}
if let Ok((key, value)) = elem {
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert!(segno + 1 == next_segno || next_segno == 0);
if next_segno == 0 {
let len = (n_segments - 1) as usize * TOAST_SEGMENT_SIZE + value.len();
let mut vec = vec![0u8; len];
vec[len - value.len()..].copy_from_slice(&value);
toast = Some(vec);
} else {
toast.as_mut().unwrap()[segno as usize * TOAST_SEGMENT_SIZE
..(segno + 1) as usize * TOAST_SEGMENT_SIZE]
.copy_from_slice(&value);
}
next_segno = segno;
if next_segno == 0 {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem);
}
}
assert_eq!(next_segno, 0);
None
assert_eq!(next_segno, 0);
None
}
}
@@ -111,7 +117,7 @@ impl ToastStore {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
Some(&path.join("pageserver.log")),
None, //Some(&path.join("pageserver.log")),
CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
@@ -122,20 +128,21 @@ impl ToastStore {
pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> {
let mut tx = self.db.start_transaction();
let value_len = value.len();
let mut key = key.clone();
self.committed = false;
let mut key = key.clone();
self.committed = false;
if value_len >= TOAST_SEGMENT_SIZE {
let compressed_data = lz4_flex::compress_prepend_size(value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u16;
let n_segments = ((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
assert!(n_segments != 0);
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8;2]);
let key_len = key.len();
let n_segments =
((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
assert!(n_segments != 0);
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8; 2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
key[key_len-2..].copy_from_slice(&segno.to_be_bytes());
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
@@ -144,19 +151,16 @@ impl ToastStore {
segno += 1;
}
if offs < compressed_data_len {
key[key_len-2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..].to_vec(),
)?;
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
}
} else {
key.extend_from_slice(&[0u8;4]);
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, value)?;
}
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
self.committed = true;
} else {
tx.delay()?;
}
@@ -174,63 +178,65 @@ impl ToastStore {
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8;4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8;4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8;4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8;4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded
};
ToastIterator { iter: self.db.range((from,till)) }
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
ToastIterator {
iter: self.db.range((from, till)),
}
}
pub fn remove(&mut self, key: &Key) -> Result<()> {
let mut tx = self.db.start_transaction();
let mut min_key = key.clone();
let mut max_key = key.clone();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
self.committed = false;
if let Some(entry) = iter.next_back() {
let mut key = entry?.0.clone();
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap());
if n_segments != 0 { // TOAST
for i in 0..n_segments {
key[key_len-2..].copy_from_slice(&i.to_be_bytes());
tx.remove(&key)?;
}
} else {
tx.remove(&key)?;
}
}
let mut tx = self.db.start_transaction();
let mut min_key = key.clone();
let mut max_key = key.clone();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
self.committed = false;
if let Some(entry) = iter.next() {
let mut key = entry?.0.clone();
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
if n_segments != 0 {
// TOAST
for i in 0..n_segments {
key[key_len - 2..].copy_from_slice(&i.to_be_bytes());
tx.remove(&key)?;
}
} else {
tx.remove(&key)?;
}
}
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
self.committed = true;
} else {
tx.delay()?;
}
Ok(())
}
Ok(())
}
}

View File

@@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -53,7 +54,7 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
const WAL_REDO_WORKERS: usize = 2;
const WAL_REDO_WORKERS: usize = 1;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -142,7 +143,7 @@ pub struct PostgresRedoManager {
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
workers: [Mutex<Option<PostgresRedoProcess>>; WAL_REDO_WORKERS],
}
#[derive(Debug)]
@@ -197,7 +198,9 @@ impl WalRedoManager for PostgresRedoManager {
start_time = Instant::now();
let result = {
let mut process_guard = self.process.lock().unwrap();
let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)]
.lock()
.unwrap();
lock_time = Instant::now();
// launch the WAL redo process on first use
@@ -239,7 +242,7 @@ impl PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)),
}
}