From a3e94e888a6fd008c681b6fca9b7217b84cc2219 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 30 Oct 2021 13:10:04 +0300 Subject: [PATCH] Implement garbage collector for buffered repository --- control_plane/src/compute.rs | 1 + pageserver/src/buffered_repository.rs | 124 +++++++++++- pageserver/src/lib.rs | 4 +- pageserver/src/toast_store.rs | 270 +++++++++++++------------- pageserver/src/walredo.rs | 11 +- 5 files changed, 267 insertions(+), 143 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index c0107a431e..d9027f31f6 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -298,6 +298,7 @@ impl PostgresNode { conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); conf.append("shared_buffers", "1MB"); + conf.append("max_wal_size", "100GB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); conf.append("wal_sender_timeout", "0"); diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 05d477b143..7b4c2318d6 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -814,7 +814,7 @@ impl Timeline for BufferedTimeline { Ok(ZERO_PAGE.clone()) } }; - /* + /* if let Some(key) = reconstruct_key { if let Ok(img) = &result { let mut store = self.store.write().unwrap(); @@ -823,7 +823,7 @@ impl Timeline for BufferedTimeline { .put(&StoreKey::Data(key).ser()?, &PageVersion::Image(img.clone()).ser()?)?; } } - */ + */ result } @@ -1314,11 +1314,125 @@ impl BufferedTimeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. /// - pub fn gc_timeline(&self, _retain_lsns: Vec, _cutoff: Lsn) -> Result { - // TODO: not implemented yet for buffred storage - let result: GcResult = Default::default(); + pub fn gc_timeline(&self, _retain_lsns: Vec, cutoff: Lsn) -> Result { + let now = Instant::now(); + let mut result: GcResult = Default::default(); + + let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); + + info!("GC starting"); + + let mut from_rel = RelishTag::Relation(RelTag { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: 0, + }); + let mut from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: Lsn(0), + }); + + // We can not remove deteriorated version immediately, we need to check first that successor exists + let mut last_key: Option = None; + + 'meta: loop { + let store = self.store.read().unwrap(); + let mut iter = store.data.range(&from.ser()?..); + while let Some(entry) = iter.next() { + let raw_key = entry?.0; + let key = StoreKey::des(&raw_key)?; + if let StoreKey::Metadata(dk) = key { + if dk.lsn < cutoff { + // we have something deteriorated + if let Some(prev_key) = last_key { + // We are still on the same relish... + if from_rel == dk.rel { + drop(store); + let mut store = self.store.write().unwrap(); + store.data.remove(&prev_key)?; + last_key = None; + // We should reset iterator and start from the current point + continue 'meta; + } + } + from_rel = dk.rel; + from = key; + // Remember key as candidate for deletion + last_key = Some(raw_key); + } else { + from_rel = dk.rel; + from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: Lsn::MAX, + }); + last_key = None; + } + } else { + break 'meta; + } + } + break; + } + + // Array to accumulate keys we can remove + let mut deteriorated: Vec = Vec::new(); + // currently proceed block number + let mut from_blknum = 0; + + 'data: loop { + let store = self.store.read().unwrap(); + let mut iter = store.data.range(&from.ser()?..); + while let Some(entry) = iter.next() { + let pair = entry?; + let raw_key = pair.0; + let key = StoreKey::des(&raw_key)?; + if let StoreKey::Data(dk) = key { + if dk.lsn < cutoff { + // we have something deteriorated + let ver = PageVersion::des(&pair.1)?; + // We are still on the same page... + if from_rel == dk.rel && from_blknum == dk.blknum { + if let PageVersion::Image(_) = ver { + // We have full page image: remove all preceding deteriorated records + drop(store); + let mut store = self.store.write().unwrap(); + for key in deteriorated.iter() { + store.data.remove(key)?; + } + deteriorated.clear(); + // We should reset iterator and start from the current point + continue 'data; + } + // No full page image, so we can't remove deteriorated stuff + deteriorated.clear(); + } + // Remember key as candidate for deletion + deteriorated.push(raw_key); + from_rel = dk.rel; + from_blknum = dk.blknum; + from = key; + } else { + // Jump to next page + from_rel = dk.rel; + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: from_blknum + 1, + lsn: Lsn(0), + }); + deteriorated.clear(); + } + } else { + break 'data; + } + } + break; + } + + result.elapsed = now.elapsed(); Ok(result) } + /// /// Reconstruct a page version, using the given base image and WAL records in 'data'. /// diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a1bde5f347..60bc3c7f5e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -35,8 +35,8 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); - pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); + pub const DEFAULT_GC_HORIZON: u64 = 1 * 1024 * 1024; + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index bf5cea4301..07f95e8c87 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -1,8 +1,8 @@ use anyhow::{anyhow, Result}; use lz4_flex; +use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; -use std::convert::TryInto; use yakv::storage::{Key, Storage, StorageIterator, Value}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; @@ -27,79 +27,85 @@ pub struct ToastIterator<'a> { impl<'a> Iterator for ToastIterator<'a> { type Item = Result<(Key, Value)>; fn next(&mut self) -> Option { - let mut toast: Option> = None; - let mut next_segno = 0u16; + let mut toast: Option> = None; + let mut next_segno = 0u16; while let Some(elem) = self.iter.next() { - if let Ok((key,value)) = elem { - let key_len = key.len(); - let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); - let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap()); - let key = key[..key_len-4].to_vec(); - if n_segments != 0 { // TOAST - assert_eq!(segno, next_segno); - if next_segno == 0 { - toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE)) - } - toast.as_mut().unwrap().extend_from_slice(&value); - next_segno = segno + 1; - if next_segno == n_segments { - let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); - return Some(if let Ok(decompressed_data) = res { - Ok((key, decompressed_data)) - } else { - Err(anyhow!(res.unwrap_err())) - }); - } - } else { - return Some(Ok((key, value))); - } - } else { - return Some(elem) - } + if let Ok((key, value)) = elem { + let key_len = key.len(); + let n_segments = + u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap()); + let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap()); + let key = key[..key_len - 4].to_vec(); + if n_segments != 0 { + // TOAST + assert_eq!(segno, next_segno); + if next_segno == 0 { + toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE)) + } + toast.as_mut().unwrap().extend_from_slice(&value); + next_segno = segno + 1; + if next_segno == n_segments { + let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + return Some(if let Ok(decompressed_data) = res { + Ok((key, decompressed_data)) + } else { + Err(anyhow!(res.unwrap_err())) + }); + } + } else { + return Some(Ok((key, value))); + } + } else { + return Some(elem); + } } - assert_eq!(next_segno, 0); - None + assert_eq!(next_segno, 0); + None } } impl<'a> DoubleEndedIterator for ToastIterator<'a> { fn next_back(&mut self) -> Option { - let mut toast: Option> = None; - let mut next_segno = 0u16; + let mut toast: Option> = None; + let mut next_segno = 0u16; while let Some(elem) = self.iter.next_back() { - if let Ok((key,value)) = elem { - let key_len = key.len(); - let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); - let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap()); - let key = key[..key_len-4].to_vec(); - if n_segments != 0 { // TOAST - assert!(segno+1 == next_segno || next_segno == 0); - if next_segno == 0 { - let len = (n_segments-1) as usize * TOAST_SEGMENT_SIZE + value.len(); - let mut vec = vec![0u8; len]; - vec[len - value.len()..].copy_from_slice(&value); - toast = Some(vec); - } else { - toast.as_mut().unwrap()[segno as usize*TOAST_SEGMENT_SIZE..(segno+1) as usize*TOAST_SEGMENT_SIZE].copy_from_slice(&value); - } - next_segno = segno; - if next_segno == 0 { - let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); - return Some(if let Ok(decompressed_data) = res { - Ok((key, decompressed_data)) - } else { - Err(anyhow!(res.unwrap_err())) - }); - } - } else { - return Some(Ok((key, value))); - } - } else { - return Some(elem) - } + if let Ok((key, value)) = elem { + let key_len = key.len(); + let n_segments = + u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap()); + let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap()); + let key = key[..key_len - 4].to_vec(); + if n_segments != 0 { + // TOAST + assert!(segno + 1 == next_segno || next_segno == 0); + if next_segno == 0 { + let len = (n_segments - 1) as usize * TOAST_SEGMENT_SIZE + value.len(); + let mut vec = vec![0u8; len]; + vec[len - value.len()..].copy_from_slice(&value); + toast = Some(vec); + } else { + toast.as_mut().unwrap()[segno as usize * TOAST_SEGMENT_SIZE + ..(segno + 1) as usize * TOAST_SEGMENT_SIZE] + .copy_from_slice(&value); + } + next_segno = segno; + if next_segno == 0 { + let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + return Some(if let Ok(decompressed_data) = res { + Ok((key, decompressed_data)) + } else { + Err(anyhow!(res.unwrap_err())) + }); + } + } else { + return Some(Ok((key, value))); + } + } else { + return Some(elem); + } } - assert_eq!(next_segno, 0); - None + assert_eq!(next_segno, 0); + None } } @@ -111,7 +117,7 @@ impl ToastStore { Ok(ToastStore { db: Storage::open( &path.join("pageserver.db"), - Some(&path.join("pageserver.log")), + None, //Some(&path.join("pageserver.log")), CACHE_SIZE, CHECKPOINT_INTERVAL, )?, @@ -122,20 +128,21 @@ impl ToastStore { pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> { let mut tx = self.db.start_transaction(); let value_len = value.len(); - let mut key = key.clone(); - self.committed = false; + let mut key = key.clone(); + self.committed = false; if value_len >= TOAST_SEGMENT_SIZE { let compressed_data = lz4_flex::compress_prepend_size(value); let compressed_data_len = compressed_data.len(); let mut offs: usize = 0; let mut segno = 0u16; - let n_segments = ((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16; - assert!(n_segments != 0); - key.extend_from_slice(&n_segments.to_be_bytes()); - key.extend_from_slice(&[0u8;2]); - let key_len = key.len(); + let n_segments = + ((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16; + assert!(n_segments != 0); + key.extend_from_slice(&n_segments.to_be_bytes()); + key.extend_from_slice(&[0u8; 2]); + let key_len = key.len(); while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { - key[key_len-2..].copy_from_slice(&segno.to_be_bytes()); + key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); tx.put( &key, &compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(), @@ -144,19 +151,16 @@ impl ToastStore { segno += 1; } if offs < compressed_data_len { - key[key_len-2..].copy_from_slice(&segno.to_be_bytes()); - tx.put( - &key, - &compressed_data[offs..].to_vec(), - )?; + key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); + tx.put(&key, &compressed_data[offs..].to_vec())?; } } else { - key.extend_from_slice(&[0u8;4]); + key.extend_from_slice(&[0u8; 4]); tx.put(&key, value)?; } if tx.get_cache_info().pinned > COMMIT_THRESHOLD { tx.commit()?; - self.committed = true; + self.committed = true; } else { tx.delay()?; } @@ -174,63 +178,65 @@ impl ToastStore { } pub fn range>(&self, range: R) -> ToastIterator<'_> { - let from = match range.start_bound() { - Bound::Included(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0u8;4]); - Bound::Included(key) - } - Bound::Excluded(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0u8;4]); - Bound::Excluded(key) - } - _ => Bound::Unbounded - }; - let till = match range.end_bound() { - Bound::Included(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0xFFu8;4]); - Bound::Included(key) - } - Bound::Excluded(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0xFFu8;4]); - Bound::Excluded(key) - } - _ => Bound::Unbounded - }; - ToastIterator { iter: self.db.range((from,till)) } + let from = match range.start_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8; 4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8; 4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded, + }; + let till = match range.end_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8; 4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8; 4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded, + }; + ToastIterator { + iter: self.db.range((from, till)), + } } pub fn remove(&mut self, key: &Key) -> Result<()> { - let mut tx = self.db.start_transaction(); - let mut min_key = key.clone(); - let mut max_key = key.clone(); - min_key.extend_from_slice(&[0u8; 4]); - max_key.extend_from_slice(&[0xFFu8; 4]); - let mut iter = tx.range(&min_key..&max_key); - self.committed = false; - if let Some(entry) = iter.next_back() { - let mut key = entry?.0.clone(); - let key_len = key.len(); - let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); - if n_segments != 0 { // TOAST - for i in 0..n_segments { - key[key_len-2..].copy_from_slice(&i.to_be_bytes()); - tx.remove(&key)?; - } - } else { - tx.remove(&key)?; - } - } + let mut tx = self.db.start_transaction(); + let mut min_key = key.clone(); + let mut max_key = key.clone(); + min_key.extend_from_slice(&[0u8; 4]); + max_key.extend_from_slice(&[0xFFu8; 4]); + let mut iter = tx.range(&min_key..&max_key); + self.committed = false; + if let Some(entry) = iter.next() { + let mut key = entry?.0.clone(); + let key_len = key.len(); + let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap()); + if n_segments != 0 { + // TOAST + for i in 0..n_segments { + key[key_len - 2..].copy_from_slice(&i.to_be_bytes()); + tx.remove(&key)?; + } + } else { + tx.remove(&key)?; + } + } if tx.get_cache_info().pinned > COMMIT_THRESHOLD { tx.commit()?; - self.committed = true; + self.committed = true; } else { tx.delay()?; } - Ok(()) - } + Ok(()) + } } - diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 6630a9947d..6d3b4233ef 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; +use rand::Rng; use serde::Serialize; use std::fs; use std::fs::OpenOptions; @@ -53,7 +54,7 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; -const WAL_REDO_WORKERS: usize = 2; +const WAL_REDO_WORKERS: usize = 1; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -142,7 +143,7 @@ pub struct PostgresRedoManager { conf: &'static PageServerConf, runtime: tokio::runtime::Runtime, - process: Mutex>, + workers: [Mutex>; WAL_REDO_WORKERS], } #[derive(Debug)] @@ -197,7 +198,9 @@ impl WalRedoManager for PostgresRedoManager { start_time = Instant::now(); let result = { - let mut process_guard = self.process.lock().unwrap(); + let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)] + .lock() + .unwrap(); lock_time = Instant::now(); // launch the WAL redo process on first use @@ -239,7 +242,7 @@ impl PostgresRedoManager { runtime, tenantid, conf, - process: Mutex::new(None), + workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)), } }