diff --git a/Cargo.lock b/Cargo.lock index e6ca7717c6..0075a7a433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2572,6 +2572,8 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17eda78e0aabbe323628af9b09766827e5b321b5e742cb38a6dd8f09c60a0c1" dependencies = [ "anyhow", "crc32c", diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 01b56d5e17..05d477b143 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -754,55 +754,77 @@ impl Timeline for BufferedTimeline { .ser()? .to_vec(); let till = StoreKey::Data(DataKey { rel, blknum, lsn }).ser()?.to_vec(); - let store = self.store.read().unwrap(); - let mut iter = store.data.range(&from..=&till); + //let mut reconstruct_key: Option = None; + let result = { + let store = self.store.read().unwrap(); + let mut iter = store.data.range(&from..=&till); - // locate latest version with LSN <= than requested - if let Some(pair) = iter.next_back() { - let ver = PageVersion::des(&pair?.1)?; - match ver { - PageVersion::Image(img) => Ok(img), // already materialized: we are done - PageVersion::Delta(rec) => { - let mut will_init = rec.will_init; - let mut data = PageReconstructData { - records: Vec::new(), - page_img: None, - }; - data.records.push((lsn, rec)); - // loop until we locate full page image or initialization WAL record - // FIXME-KK: cross-timelines histories are not handled now - while !will_init { - if let Some(entry) = iter.next_back() { - let pair = entry?; - let key = StoreKey::des(&pair.0)?; - let ver = PageVersion::des(&pair.1)?; - if let StoreKey::Data(dk) = key { - assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image - match ver { - PageVersion::Image(img) => { - data.page_img = Some(img); - break; - } - PageVersion::Delta(rec) => { - will_init = rec.will_init; - data.records.push((dk.lsn, rec)); + // locate latest version with LSN <= than requested + if let Some(entry) = iter.next_back() { + let pair = entry?; + let key = StoreKey::des(&pair.0)?; + if let StoreKey::Data(dk) = key { + let ver = PageVersion::des(&pair.1)?; + match ver { + PageVersion::Image(img) => Ok(img), // already materialized: we are done + PageVersion::Delta(rec) => { + let mut will_init = rec.will_init; + let mut data = PageReconstructData { + records: Vec::new(), + page_img: None, + }; + //reconstruct_key = Some(dk); + data.records.push((dk.lsn, rec)); + // loop until we locate full page image or initialization WAL record + // FIXME-KK: cross-timelines histories are not handled now + while !will_init { + if let Some(entry) = iter.next_back() { + let pair = entry?; + let key = StoreKey::des(&pair.0)?; + let ver = PageVersion::des(&pair.1)?; + if let StoreKey::Data(dk) = key { + assert!(dk.rel == rel); // check that we don't jump to previous relish before locating full image + match ver { + PageVersion::Image(img) => { + data.page_img = Some(img); + break; + } + PageVersion::Delta(rec) => { + will_init = rec.will_init; + data.records.push((dk.lsn, rec)); + } + } + } else { + bail!("Unexpected key type {:?}", key); } + } else { + bail!("Base image not found for relish {} at {}", rel, lsn); } - } else { - bail!("Unexpected key type {:?}", key); } - } else { - bail!("Base image not found for relish {} at {}", rel, lsn); + RECONSTRUCT_TIME.observe_closure_duration(|| { + self.reconstruct_page(rel, blknum, lsn, data) + }) } } - RECONSTRUCT_TIME - .observe_closure_duration(|| self.reconstruct_page(rel, blknum, lsn, data)) + } else { + bail!("Unexpected key type {:?}", key); } + } else { + warn!("block {} of relish {} not found at {}", blknum, rel, lsn); + Ok(ZERO_PAGE.clone()) + } + }; + /* + if let Some(key) = reconstruct_key { + if let Ok(img) = &result { + let mut store = self.store.write().unwrap(); + store + .data + .put(&StoreKey::Data(key).ser()?, &PageVersion::Image(img.clone()).ser()?)?; } - } else { - warn!("block {} of relish {} not found at {}", blknum, rel, lsn); - Ok(ZERO_PAGE.clone()) } + */ + result } fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result> { diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a2c0c47460..a1bde5f347 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -33,7 +33,7 @@ pub mod defaults { // Minimal size of WAL records chain to trigger materialization of the page pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_millis(1000); + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index 074a15fb13..bf5cea4301 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -1,23 +1,13 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use lz4_flex; -use serde::{Deserialize, Serialize}; -use std::ops::RangeBounds; +use std::ops::{Bound, RangeBounds}; use std::path::Path; -use tracing::*; +use std::convert::TryInto; use yakv::storage::{Key, Storage, StorageIterator, Value}; -use zenith_utils::bin_ser::BeSer; - const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; -const MAIN_CACHE_SIZE: usize = 8 * 1024; // 64Mb -const TOAST_CACHE_SIZE: usize = 1024; // 8Mb -const MAIN_COMMIT_THRESHOLD: usize = MAIN_CACHE_SIZE / 2; -const TOAST_COMMIT_THRESHOLD: usize = TOAST_CACHE_SIZE / 2; - -const TOAST_VALUE_TAG: u8 = 0; -const PLAIN_VALUE_TAG: u8 = 1; - -type ToastId = u32; +const CACHE_SIZE: usize = 32 * 1024; // 256Mb +const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2; /// /// Toast storage consistof two KV databases: one for storing main index @@ -26,50 +16,90 @@ type ToastId = u32; /// data locality and reduce key size for TOAST segments. /// pub struct ToastStore { - main: Storage, // primary storage - toast: Storage, // storage for TOAST segments - next_id: ToastId, // counter used to identify new TOAST segments + db: Storage, // key-value database pub committed: bool, // last transaction was committed (not delayed) } -/// -/// TOAST reference -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -struct ToastRef { - toast_id: ToastId, // assigned TOAST indetifier - orig_size: u32, // Original (uncompressed) value size - compressed_size: u32, // Compressed object size -} - -/// -/// Identifier of TOAST segment. -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -struct ToastSegId { - toast_id: ToastId, - segno: u32, // segment number used to extract segments in proper order -} - pub struct ToastIterator<'a> { - store: &'a ToastStore, iter: StorageIterator<'a>, } impl<'a> Iterator for ToastIterator<'a> { type Item = Result<(Key, Value)>; fn next(&mut self) -> Option { - self.iter.next().and_then(|res| { - Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?)))) - }) + let mut toast: Option> = None; + let mut next_segno = 0u16; + while let Some(elem) = self.iter.next() { + if let Ok((key,value)) = elem { + let key_len = key.len(); + let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); + let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap()); + let key = key[..key_len-4].to_vec(); + if n_segments != 0 { // TOAST + assert_eq!(segno, next_segno); + if next_segno == 0 { + toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE)) + } + toast.as_mut().unwrap().extend_from_slice(&value); + next_segno = segno + 1; + if next_segno == n_segments { + let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + return Some(if let Ok(decompressed_data) = res { + Ok((key, decompressed_data)) + } else { + Err(anyhow!(res.unwrap_err())) + }); + } + } else { + return Some(Ok((key, value))); + } + } else { + return Some(elem) + } + } + assert_eq!(next_segno, 0); + None } } impl<'a> DoubleEndedIterator for ToastIterator<'a> { fn next_back(&mut self) -> Option { - self.iter.next_back().and_then(|res| { - Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?)))) - }) + let mut toast: Option> = None; + let mut next_segno = 0u16; + while let Some(elem) = self.iter.next_back() { + if let Ok((key,value)) = elem { + let key_len = key.len(); + let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); + let segno = u16::from_be_bytes(key[key_len-2..].try_into().unwrap()); + let key = key[..key_len-4].to_vec(); + if n_segments != 0 { // TOAST + assert!(segno+1 == next_segno || next_segno == 0); + if next_segno == 0 { + let len = (n_segments-1) as usize * TOAST_SEGMENT_SIZE + value.len(); + let mut vec = vec![0u8; len]; + vec[len - value.len()..].copy_from_slice(&value); + toast = Some(vec); + } else { + toast.as_mut().unwrap()[segno as usize*TOAST_SEGMENT_SIZE..(segno+1) as usize*TOAST_SEGMENT_SIZE].copy_from_slice(&value); + } + next_segno = segno; + if next_segno == 0 { + let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + return Some(if let Ok(decompressed_data) = res { + Ok((key, decompressed_data)) + } else { + Err(anyhow!(res.unwrap_err())) + }); + } + } else { + return Some(Ok((key, value))); + } + } else { + return Some(elem) + } + } + assert_eq!(next_segno, 0); + None } } @@ -79,198 +109,128 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> { impl ToastStore { pub fn new(path: &Path) -> Result { Ok(ToastStore { - main: Storage::open( - &path.join("main.db"), - Some(&path.join("main.log")), - MAIN_CACHE_SIZE, + db: Storage::open( + &path.join("pageserver.db"), + Some(&path.join("pageserver.log")), + CACHE_SIZE, CHECKPOINT_INTERVAL, )?, - toast: Storage::open( - &path.join("toast.db"), - Some(&path.join("toast.log")), - TOAST_CACHE_SIZE, - CHECKPOINT_INTERVAL, - )?, - next_id: 0, committed: false, }) } pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> { - let mut main_tx = self.main.start_transaction(); + let mut tx = self.db.start_transaction(); let value_len = value.len(); - let main_pinned; + let mut key = key.clone(); + self.committed = false; if value_len >= TOAST_SEGMENT_SIZE { - let mut toast_tx = self.toast.start_transaction(); - if self.next_id == 0 { - self.next_id = toast_tx - .iter() - .next_back() - .transpose()? - .map_or(0u32, |(key, _value)| { - ToastSegId::des(&key).unwrap().toast_id - }); - } - self.next_id += 1; - let toast_id = self.next_id; - let compressed_data = lz4_flex::compress(value); + let compressed_data = lz4_flex::compress_prepend_size(value); let compressed_data_len = compressed_data.len(); let mut offs: usize = 0; - let mut segno = 0u32; + let mut segno = 0u16; + let n_segments = ((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16; + assert!(n_segments != 0); + key.extend_from_slice(&n_segments.to_be_bytes()); + key.extend_from_slice(&[0u8;2]); + let key_len = key.len(); while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { - toast_tx.put( - &ToastSegId { toast_id, segno }.ser()?, + key[key_len-2..].copy_from_slice(&segno.to_be_bytes()); + tx.put( + &key, &compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(), )?; offs += TOAST_SEGMENT_SIZE; segno += 1; } if offs < compressed_data_len { - toast_tx.put( - &ToastSegId { toast_id, segno }.ser()?, + key[key_len-2..].copy_from_slice(&segno.to_be_bytes()); + tx.put( + &key, &compressed_data[offs..].to_vec(), )?; } - let mut value = ToastRef { - toast_id, - orig_size: value_len as u32, - compressed_size: compressed_data_len as u32, - } - .ser()?; - value.insert(0, TOAST_VALUE_TAG); - main_tx.put(key, &value)?; - main_pinned = main_tx.get_cache_info().pinned; - // If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references - if main_pinned > MAIN_COMMIT_THRESHOLD - || toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD - { - toast_tx.commit()?; - } else { - toast_tx.delay()?; - } } else { - let mut vec = Vec::with_capacity(value.len() + 1); - vec.push(PLAIN_VALUE_TAG); - vec.extend_from_slice(&value); - main_tx.put(key, &vec)?; - main_pinned = main_tx.get_cache_info().pinned; + key.extend_from_slice(&[0u8;4]); + tx.put(&key, value)?; } - if main_pinned > MAIN_COMMIT_THRESHOLD { - main_tx.commit()?; - self.committed = true; + if tx.get_cache_info().pinned > COMMIT_THRESHOLD { + tx.commit()?; + self.committed = true; } else { - main_tx.delay()?; - self.committed = false; + tx.delay()?; } Ok(()) } pub fn checkpoint(&self) -> Result<()> { - let mut main_tx = self.main.start_transaction(); - let mut toast_tx = self.toast.start_transaction(); - toast_tx.commit()?; - main_tx.commit()?; + let mut tx = self.db.start_transaction(); + tx.commit()?; Ok(()) } - pub fn get(&self, key: &[u8]) -> Result> { - self.main - .get(&key.to_vec()) - .transpose() - .and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?)))) - .transpose() - } - pub fn iter(&self) -> ToastIterator<'_> { self.range(..) } pub fn range>(&self, range: R) -> ToastIterator<'_> { - ToastIterator { - store: self, - iter: self.main.range(range), - } + let from = match range.start_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8;4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8;4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded + }; + let till = match range.end_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8;4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8;4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded + }; + ToastIterator { iter: self.db.range((from,till)) } } pub fn remove(&mut self, key: &Key) -> Result<()> { - let mut main_tx = self.main.start_transaction(); - if let Some(value) = main_tx.get(key)? { - main_tx.remove(key)?; - let main_pinned = main_tx.get_cache_info().pinned; - if value[0] == TOAST_VALUE_TAG { - let mut toast_tx = self.toast.start_transaction(); - let toast_ref = ToastRef::des(&value[1..])?; - let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1) - / TOAST_SEGMENT_SIZE) as u32; - for segno in 0..n_segments { - toast_tx.remove( - &ToastSegId { - toast_id: toast_ref.toast_id, - segno, - } - .ser()?, - )?; - } - // If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references - if main_pinned > MAIN_COMMIT_THRESHOLD - || toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD - { - toast_tx.commit()?; - } else { - toast_tx.delay()?; - } - } - if main_pinned > MAIN_COMMIT_THRESHOLD { - main_tx.commit()?; - self.committed = true; - } else { - main_tx.delay()?; - self.committed = false; - } + let mut tx = self.db.start_transaction(); + let mut min_key = key.clone(); + let mut max_key = key.clone(); + min_key.extend_from_slice(&[0u8; 4]); + max_key.extend_from_slice(&[0xFFu8; 4]); + let mut iter = tx.range(&min_key..&max_key); + self.committed = false; + if let Some(entry) = iter.next_back() { + let mut key = entry?.0.clone(); + let key_len = key.len(); + let n_segments = u16::from_be_bytes(key[key_len-4..key_len-2].try_into().unwrap()); + if n_segments != 0 { // TOAST + for i in 0..n_segments { + key[key_len-2..].copy_from_slice(&i.to_be_bytes()); + tx.remove(&key)?; + } + } else { + tx.remove(&key)?; + } + } + if tx.get_cache_info().pinned > COMMIT_THRESHOLD { + tx.commit()?; + self.committed = true; } else { - self.committed = false; + tx.delay()?; } - Ok(()) - } - - pub fn close(&self) -> Result<()> { - self.toast.close()?; // commit and close TOAST store first to avoid dangling references - self.main.close()?; - Ok(()) - } - - fn detoast(&self, mut value: Value) -> Result { - if value[0] == TOAST_VALUE_TAG { - // TOAST chain - let toast_ref = ToastRef::des(&value[1..])?; - let mut toast: Value = Vec::with_capacity(toast_ref.orig_size as usize); - let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1) - / TOAST_SEGMENT_SIZE) as u32; - let from = ToastSegId { - toast_id: toast_ref.toast_id, - segno: 0, - } - .ser()?; - let till = ToastSegId { - toast_id: toast_ref.toast_id, - segno: n_segments, - } - .ser()?; - for seg in self.toast.range(from..till) { - toast.extend_from_slice(&seg?.1); - } - Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?) - } else { - value.remove(0); // remove toast tag - Ok(value) - } - } + Ok(()) + } } -impl Drop for ToastStore { - fn drop(&mut self) { - info!("Storage closed"); - // FIXME-KK: better call close() explicitly - self.close().unwrap(); - } -} diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 8cd696e8f3..6630a9947d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -53,6 +53,8 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; +const WAL_REDO_WORKERS: usize = 2; + /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. ///