From 497258c6fe657ff7afdeaf72b9b7ab795c2bb037 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 26 Oct 2021 20:07:11 +0300 Subject: [PATCH] Do not produce error in get_page_at_lsn on missed page --- pageserver/Cargo.toml | 3 +- pageserver/src/buffered_repository.rs | 154 ++++++++---------- pageserver/src/toast_store.rs | 219 ++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 87 deletions(-) create mode 100644 pageserver/src/toast_store.rs diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 98d06a1909..e1f95cdddb 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,7 +37,8 @@ async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } -yakv = "0.1.3" +#yakv = { path = "../../yakv" } +yakv = "0.1.5" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 4242f98691..5c97beb1f1 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -42,9 +42,7 @@ use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; -use zenith_metrics::{ - register_histogram, register_int_gauge_vec, Histogram, IntGauge, IntGaugeVec, -}; +use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::bin_ser::BeSer; use zenith_utils::crashsafe_dir; @@ -376,7 +374,7 @@ impl BufferedRepository { fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { while !tenant_mgr::shutdown_requested() { std::thread::sleep(conf.checkpoint_period); - info!("checkpointer thread for tenant {} waking up", self.tenantid); + trace!("checkpointer thread for tenant {} waking up", self.tenantid); // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE // bytes of WAL since last checkpoint. @@ -697,15 +695,6 @@ pub struct BufferedTimeline { // With compressed KV storage them are completely different. current_logical_size: AtomicUsize, // bytes - // To avoid calling .with_label_values and formatting the tenant and timeline IDs to strings - // every time the logical size is updated, keep a direct reference to the Gauge here. - // unfortunately it doesnt forward atomic methods like .fetch_add - // so use two fields: actual size and metric - // see https://github.com/zenithdb/zenith/issues/622 for discussion - // TODO: it is possible to combine these two fields into single one using custom metric which uses SeqCst - // ordering for its operations, but involves private modules, and macro trickery - current_logical_size_gauge: IntGauge, - /// If `true`, will backup its timeline files to remote storage after freezing. upload_relishes: bool, @@ -753,6 +742,8 @@ impl Timeline for BufferedTimeline { rel ); } + // FIXME-KK; navigation to ancestor timelines is not yet supported + assert!(self.ancestor_timeline.is_none()); debug_assert!(lsn <= self.get_last_record_lsn()); let from = StoreKey::Data(DataKey { @@ -809,7 +800,8 @@ impl Timeline for BufferedTimeline { } } } else { - bail!("relish {} not found at {}", rel, lsn); + warn!("block {} of relish {} not found at {}", blknum, rel, lsn); + Ok(ZERO_PAGE.clone()) } } @@ -951,7 +943,7 @@ impl Timeline for BufferedTimeline { } impl RelishStore { - fn load_metadata(&mut self) -> Result<()> { + fn load_metadata(&mut self) -> Result<&'_ mut HashMap> { if self.meta.is_none() { let mut meta: HashMap = HashMap::new(); let mut till = StoreKey::Metadata(MetadataKey { @@ -964,7 +956,7 @@ impl RelishStore { let pair = entry?; let key = StoreKey::des(&pair.0)?; if let StoreKey::Metadata(last) = key { - let metadata = MetadataValue::des(&pair.0)?; + let metadata = MetadataValue::des(&pair.1)?; if let Some(size) = metadata.size { // igonore dropped relations meta.insert( @@ -988,7 +980,7 @@ impl RelishStore { } self.meta = Some(meta) } - Ok(()) + Ok(self.meta.as_mut().unwrap()) } fn _unload_metadata(&mut self) { @@ -1011,9 +1003,6 @@ impl BufferedTimeline { current_logical_size: usize, upload_relishes: bool, ) -> Result { - let current_logical_size_gauge = LOGICAL_TIMELINE_SIZE - .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) - .unwrap(); let path = conf.timeline_path(&timelineid, &tenantid); let timeline = BufferedTimeline { conf, @@ -1036,7 +1025,6 @@ impl BufferedTimeline { ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, current_logical_size: AtomicUsize::new(current_logical_size), - current_logical_size_gauge, upload_relishes, write_lock: Mutex::new(()), @@ -1063,6 +1051,7 @@ impl BufferedTimeline { // // List all relish in inclsive range [from_rel, till_rel] exists at the specfied LSN + // fn list_relishes( &self, from_rel: RelishTag, @@ -1175,6 +1164,7 @@ impl BufferedTimeline { }) .ser()?; // this MAX values allows to use this boundary as exclusive + let mut n_checkpointed_records = 0; loop { let store = self.store.read().unwrap(); @@ -1231,7 +1221,8 @@ impl BufferedTimeline { }); let mut store = self.store.write().unwrap(); - store.data.put(&key, &img?.to_vec())?; + store.data.put(&key, &PageVersion::Image(img?).ser()?)?; + n_checkpointed_records += 1; } } // Jump to next page. Setting lsn=0 and using it as exclusive boundary allows us to jump to previous page. @@ -1248,6 +1239,24 @@ impl BufferedTimeline { break; } } + let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid); + let ondisk_prev_record_lsn = self.get_prev_record_lsn(); + let metadata = TimelineMetadata { + disk_consistent_lsn: self.disk_consistent_lsn.load(), + prev_record_lsn: Some(ondisk_prev_record_lsn), + ancestor_timeline: ancestor_timelineid, + ancestor_lsn: self.ancestor_lsn, + }; + + BufferedRepository::save_metadata( + self.conf, + self.timelineid, + self.tenantid, + &metadata, + false, + )?; + trace!("Checkpoint {} records", n_checkpointed_records); + if self.upload_relishes { schedule_timeline_upload(()) // schedule_timeline_upload( @@ -1372,8 +1381,14 @@ pub enum PageVersion { Delta(WALRecord), } -impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { - fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { +impl<'a> BufferedTimelineWriter<'a> { + fn put_page_version( + &self, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + ver: PageVersion, + ) -> Result<()> { if !rel.is_blocky() && blknum != 0 { bail!( "invalid request for block {} for non-blocky relish {}", @@ -1382,24 +1397,15 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { ); } ensure!(lsn.is_aligned(), "unaligned record LSN"); - let key = StoreKey::Data(DataKey { rel, blknum, lsn }); - let value = PageVersion::Delta(rec); let mut store = self.tl.store.write().unwrap(); - store.data.put(&key.ser()?, &value.ser()?)?; + store.data.put(&key.ser()?, &ver.ser()?)?; // Update metadata - store.load_metadata()?; - if store - .meta - .as_ref() - .unwrap() - .get(&rel) - .map(|m| m.size) - .unwrap_or(0) - <= blknum - { - store.meta.as_mut().unwrap().insert( + let meta_hash = store.load_metadata()?; + let rel_size = meta_hash.get(&rel).map(|m| m.size).unwrap_or(0); + if rel_size <= blknum { + meta_hash.insert( rel, MetadataSnapshot { size: blknum + 1, @@ -1411,52 +1417,32 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { size: Some(blknum + 1), }; store.data.put(&mk.ser()?, &mv.ser()?)?; +/* + // Fill gap with zero pages + for blk in rel_size..blknum { + let key = StoreKey::Data(DataKey { + rel, + blknum: blk, + lsn, + }); + store + .data + .put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?; + } +*/ } self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk Ok(()) } +} + +impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { + fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { + self.put_page_version(rel, blknum, lsn, PageVersion::Delta(rec)) + } fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> { - if !rel.is_blocky() && blknum != 0 { - bail!( - "invalid request for block {} for non-blocky relish {}", - blknum, - rel - ); - } - ensure!(lsn.is_aligned(), "unaligned record LSN"); - - let key = StoreKey::Data(DataKey { rel, blknum, lsn }); - let value = PageVersion::Image(img); - let mut store = self.tl.store.write().unwrap(); - store.data.put(&key.ser()?, &value.ser()?)?; - - // Update netadata - store.load_metadata()?; - if store - .meta - .as_ref() - .unwrap() - .get(&rel) - .map(|m| m.size) - .unwrap_or(0) - <= blknum - { - store.meta.as_mut().unwrap().insert( - rel, - MetadataSnapshot { - size: blknum + 1, - lsn, - }, - ); - let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); - let mv = MetadataValue { - size: Some(blknum + 1), - }; - store.data.put(&mk.ser()?, &mv.ser()?)?; - } - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk - Ok(()) + self.put_page_version(rel, blknum, lsn, PageVersion::Image(img)) } fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> Result<()> { @@ -1468,12 +1454,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); let mut store = self.tl.store.write().unwrap(); - store.load_metadata()?; - store - .meta - .as_mut() - .unwrap() - .insert(rel, MetadataSnapshot { size: relsize, lsn }); + let meta_hash = store.load_metadata()?; + meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn }); let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: Some(relsize), @@ -1489,8 +1471,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { trace!("drop_segment: {} at {}", rel, lsn); let mut store = self.tl.store.write().unwrap(); - store.load_metadata()?; - store.meta.as_mut().unwrap().remove(&rel); + let meta_hash = store.load_metadata()?; + meta_hash.remove(&rel); let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: None }; // None indicates dropped relation store.data.put(&mk.ser()?, &mv.ser()?)?; diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs new file mode 100644 index 0000000000..ae211fcd5e --- /dev/null +++ b/pageserver/src/toast_store.rs @@ -0,0 +1,219 @@ +use anyhow::Result; +use lz4_flex; +use serde::{Deserialize, Serialize}; +use std::ops::RangeBounds; +use std::path::Path; +use yakv::storage::{Key, Storage, StorageIterator, Value}; +use zenith_utils::bin_ser::BeSer; + +const TOAST_SEGMENT_SIZE: usize = 2 * 1024; +const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; +const CACHE_SIZE: usize = 1024; // 8Mb + +const TOAST_VALUE_TAG: u8 = 0; +const PLAIN_VALUE_TAG: u8 = 1; + +type ToastId = u32; + +/// +/// Toast storage consistof two KV databases: one for storing main index +/// and second for storing sliced BLOB (values larger than 2kb). +/// BLOBs and main data are stored in different databases to improve +/// data locality and reduce key size for TOAST segments. +/// +pub struct ToastStore { + pub update_count: u64, + pub index: Storage, // primary storage + blobs: Storage, // storage for TOAST segments + next_id: ToastId, // counter used to identify new TOAST segments +} + +/// +/// TOAST reference +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +struct ToastRef { + toast_id: ToastId, // assigned TOAST indetifier + orig_size: u32, // Original (uncompressed) value size + compressed_size: u32, // Compressed object size +} + +/// +/// Identifier of TOAST segment. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +struct ToastSegId { + toast_id: ToastId, + segno: u32, // segment number used to extract segments in proper order +} + +pub struct ToastIterator<'a> { + store: &'a ToastStore, + iter: StorageIterator<'a>, +} + +impl<'a> Iterator for ToastIterator<'a> { + type Item = Result<(Key, Value)>; + fn next(&mut self) -> Option { + self.iter.next().and_then(|res| { + Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?)))) + }) + } +} + +impl<'a> DoubleEndedIterator for ToastIterator<'a> { + fn next_back(&mut self) -> Option { + self.iter.next_back().and_then(|res| { + Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?)))) + }) + } +} + +// +// FIXME-KK: not using WAL now. Implement asynchronous or delayed commit. +// +impl ToastStore { + pub fn new(path: &Path) -> Result { + Ok(ToastStore { + update_count: 0, + index: Storage::open( + &path.join("index.db"), + None, + CACHE_SIZE, + CHECKPOINT_INTERVAL, + )?, + blobs: Storage::open( + &path.join("blobs.db"), + None, + CACHE_SIZE, + CHECKPOINT_INTERVAL, + )?, + next_id: 0, + }) + } + + pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> { + let mut index_tx = self.index.start_transaction(); + let value_len = value.len(); + self.update_count += 1; + if value_len >= TOAST_SEGMENT_SIZE { + let mut blobs_tx = self.blobs.start_transaction(); + if self.next_id == 0 { + self.next_id = blobs_tx + .iter() + .next_back() + .transpose()? + .map_or(0u32, |(key, _value)| { + ToastSegId::des(&key).unwrap().toast_id + }); + } + self.next_id += 1; + let toast_id = self.next_id; + let compressed_data = lz4_flex::compress(value); + let compressed_data_len = compressed_data.len(); + let mut offs: usize = 0; + let mut segno = 0u32; + while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { + blobs_tx.put( + &ToastSegId { toast_id, segno }.ser()?, + &compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(), + )?; + offs += TOAST_SEGMENT_SIZE; + segno += 1; + } + if offs < compressed_data_len { + blobs_tx.put( + &ToastSegId { toast_id, segno }.ser()?, + &compressed_data[offs..].to_vec(), + )?; + } + let mut value = ToastRef { + toast_id, + orig_size: value_len as u32, + compressed_size: compressed_data_len as u32, + } + .ser()?; + value.insert(0, TOAST_VALUE_TAG); + index_tx.put(key, &value)?; + blobs_tx.commit()?; + } else { + let mut vec = Vec::with_capacity(value.len() + 1); + vec.push(PLAIN_VALUE_TAG); + vec.extend_from_slice(&value); + index_tx.put(key, &vec)?; + } + index_tx.commit()?; + Ok(()) + } + + pub fn get(&self, key: &[u8]) -> Result> { + self.index + .get(&key.to_vec()) + .transpose() + .and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?)))) + .transpose() + } + + pub fn iter(&self) -> ToastIterator<'_> { + self.range(..) + } + + pub fn range>(&self, range: R) -> ToastIterator<'_> { + ToastIterator { + store: self, + iter: self.index.range(range), + } + } + + pub fn remove(&self, key: &Key) -> Result<()> { + let mut index_tx = self.index.start_transaction(); + if let Some(value) = index_tx.get(key)? { + if value[0] == TOAST_VALUE_TAG { + let mut blobs_tx = self.blobs.start_transaction(); + let toast_ref = ToastRef::des(&value[1..])?; + let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1) + / TOAST_SEGMENT_SIZE) as u32; + for segno in 0..n_segments { + blobs_tx.remove( + &ToastSegId { + toast_id: toast_ref.toast_id, + segno, + } + .ser()?, + )?; + } + blobs_tx.commit()?; + } + index_tx.remove(key)?; + } + index_tx.commit()?; + Ok(()) + } + + fn detoast(&self, mut value: Value) -> Result { + if value[0] == TOAST_VALUE_TAG { + // TOAST chain + let toast_ref = ToastRef::des(&value[1..])?; + let mut toast: Value = Vec::with_capacity(toast_ref.orig_size as usize); + let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1) + / TOAST_SEGMENT_SIZE) as u32; + let from = ToastSegId { + toast_id: toast_ref.toast_id, + segno: 0, + } + .ser()?; + let till = ToastSegId { + toast_id: toast_ref.toast_id, + segno: n_segments, + } + .ser()?; + for seg in self.blobs.range(from..till) { + toast.extend_from_slice(&seg?.1); + } + Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?) + } else { + value.remove(0); // remove toast tag + Ok(value) + } + } +}