diff --git a/Cargo.lock b/Cargo.lock index f842c55a12..23b6b252b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,9 +902,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" dependencies = [ "scopeguard", ] @@ -1190,6 +1190,7 @@ dependencies = [ "lazy_static", "log", "lz4_flex", + "parking_lot", "postgres", "postgres-protocol", "postgres-types", @@ -1215,9 +1216,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", @@ -1226,9 +1227,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ "cfg-if 1.0.0", "instant", @@ -2571,10 +2572,11 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.2.1" +version = "0.2.4" dependencies = [ "anyhow", "fs2", + "parking_lot", ] [[package]] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0e469c85ee..30d2c585e0 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,14 +37,15 @@ async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } -#yakv = { path = "../../yakv" } -yakv = "0.2.2" +yakv = { path = "../../yakv" } +#yakv = "0.2.4" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } zenith_utils = { path = "../zenith_utils" } workspace_hack = { path = "../workspace_hack" } +parking_lot = "0.11.2" [dev-dependencies] hex-literal = "0.3" diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 54b049b148..224ac8f3c4 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -18,6 +18,7 @@ use postgres_ffi::pg_constants::BLCKSZ; use serde::{Deserialize, Serialize}; use tracing::*; +use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::convert::TryInto; use std::fs; @@ -26,7 +27,7 @@ use std::io::Write; use std::ops::{Bound::*, Deref}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::Arc; use std::thread::JoinHandle; use std::time::{Duration, Instant}; @@ -181,24 +182,24 @@ struct BrinTag { // // Relish store consists of persistent KV store and transient metadata cache loadedon demand // -struct RelishStore { - data: ToastStore, +struct BufferedTimelineInner { meta: Option>, brin: BTreeMap, last_checkpoint: Lsn, last_gc: Lsn, + last_commit: Lsn, } /// Public interface impl Repository for BufferedRepository { fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock(); Ok(self.get_timeline_locked(timelineid, &mut timelines)?) } fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock(); // Create the timeline directory, and write initial metadata to file. crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; @@ -281,7 +282,7 @@ impl Repository for BufferedRepository { fn shutdown(&self) -> Result<()> { trace!("BufferedRepository shutdown for tenant {}", self.tenantid); - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock(); for (timelineid, timeline) in timelines.iter() { walreceiver::stop_wal_receiver(*timelineid); // Wait for syncing data to disk @@ -298,7 +299,7 @@ impl Repository for BufferedRepository { /// Private functions impl BufferedRepository { fn get_buffered_timeline(&self, timelineid: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock(); self.get_timeline_locked(timelineid, &mut timelines) } @@ -404,11 +405,10 @@ impl BufferedRepository { let timelines: Vec<(ZTimelineId, Arc)> = self .timelines .lock() - .unwrap() .iter() .map(|pair| (*pair.0, pair.1.clone())) .collect(); - //let timelines = self.timelines.lock().unwrap(); + //let timelines = self.timelines.lock(); for (timelineid, timeline) in timelines.iter() { let _entered = info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) @@ -455,7 +455,6 @@ impl BufferedRepository { let timelines: Vec<(ZTimelineId, Arc)> = self .timelines .lock() - .unwrap() .iter() .map(|pair| (*pair.0, pair.1.clone())) .collect(); @@ -613,7 +612,7 @@ impl BufferedRepository { // grab mutex to prevent new timelines from being created here. // TODO: We will hold it for a long time - //let mut timelines = self.timelines.lock().unwrap(); + //let mut timelines = self.timelines.lock(); // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. @@ -725,7 +724,8 @@ pub struct BufferedTimeline { tenantid: ZTenantId, timelineid: ZTimelineId, - store: RwLock, // provide MURSIW access to the storage + inner: RwLock, + store: ToastStore, // WAL redo manager walredo_mgr: Arc, @@ -833,8 +833,8 @@ impl Timeline for BufferedTimeline { let till = StoreKey::Data(DataKey { rel, blknum, lsn }).ser()?.to_vec(); //let mut reconstruct_key: Option = None; let result = { - let store = self.store.read().unwrap(); - let mut iter = store.data.range(&from..=&till); + let snapshot = self.store.take_snapshot(); + let mut iter = snapshot.range(&from..=&till); // locate latest version with LSN <= than requested if let Some(entry) = iter.next_back() { @@ -875,9 +875,15 @@ impl Timeline for BufferedTimeline { bail!("Unexpected key type {:?}", key); } } else { - bail!("Base image not found for relish {} at {}", rel, lsn); + bail!( + "Base image not found for relish {} at {}", + dk.rel, + dk.lsn + ); } } + drop(iter); + drop(snapshot); RECONSTRUCT_TIME.observe_closure_duration(|| { self.reconstruct_page(rel, blknum, lsn, data) }) @@ -891,13 +897,10 @@ impl Timeline for BufferedTimeline { Ok(ZERO_PAGE.clone()) } }; - /* + /* // TODO: insertion materialized page in storage cause negative impact on performance. if let Some(key) = reconstruct_key { if let Ok(img) = &result { - let mut store = self.store.write().unwrap(); - store - .data - .put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?; + self.store.put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?; } } */ @@ -913,14 +916,16 @@ impl Timeline for BufferedTimeline { } debug_assert!(lsn <= self.get_last_record_lsn()); - let store = self.store.read().unwrap(); - // Use metadata hash only if it was loaded - if let Some(hash) = &store.meta { - if let Some(snap) = hash.get(&rel) { - // We can used cached version only of requested LSN is >= than LSN of last version. - // Otherwise extract historical value from KV storage. - if snap.lsn <= lsn { - return Ok(Some(snap.size)); + { + let inner = self.inner.read(); + // Use metadata hash only if it was loaded + if let Some(hash) = &inner.meta { + if let Some(snap) = hash.get(&rel) { + // We can used cached version only of requested LSN is >= than LSN of last version. + // Otherwise extract historical value from KV storage. + if snap.lsn <= lsn { + return Ok(Some(snap.size)); + } } } } @@ -929,7 +934,8 @@ impl Timeline for BufferedTimeline { .to_vec(); let till = StoreKey::Metadata(MetadataKey { rel, lsn }).ser()?.to_vec(); // locate last version with LSN <= than requested - let mut iter = store.data.range(&from..=&till); + let snapshot = self.store.take_snapshot(); + let mut iter = snapshot.range(&from..=&till); if let Some(pair) = iter.next_back() { let meta = MetadataValue::des(&pair?.1)?; @@ -999,9 +1005,9 @@ impl Timeline for BufferedTimeline { }); let mut relsizes: HashMap> = HashMap::new(); let mut dropped: HashSet = HashSet::new(); - let store = self.store.read().unwrap(); 'meta: loop { - let iter = store.data.range(&from.ser()?..); + let snapshot = self.store.take_snapshot(); + let iter = snapshot.range(&from.ser()?..); for entry in iter { let pair = entry?; @@ -1055,7 +1061,8 @@ impl Timeline for BufferedTimeline { let mut last_lsn = Lsn(0); let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new(); 'pages: loop { - let iter = store.data.range(&from.ser()?..); + let snapshot = self.store.take_snapshot(); + let iter = snapshot.range(&from.ser()?..); for entry in iter { let pair = entry?; if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? { @@ -1198,7 +1205,7 @@ impl Timeline for BufferedTimeline { } fn get_current_logical_size(&self) -> usize { - self.current_logical_size.load(Ordering::Acquire) as usize + self.store.size() as usize } fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { @@ -1231,13 +1238,16 @@ impl Timeline for BufferedTimeline { fn writer<'a>(&'a self) -> Box { Box::new(BufferedTimelineWriter { tl: self, - _write_guard: self.write_lock.lock().unwrap(), + _write_guard: self.write_lock.lock(), }) } } -impl RelishStore { - fn load_metadata(&mut self) -> Result<&'_ mut HashMap> { +impl BufferedTimelineInner { + fn get_metadata( + &mut self, + store: &ToastStore, + ) -> Result<&'_ mut HashMap> { if self.meta.is_none() { let mut meta: HashMap = HashMap::new(); let mut till = StoreKey::Metadata(MetadataKey { @@ -1245,9 +1255,12 @@ impl RelishStore { lsn: Lsn::MAX, }); loop { - let mut iter = self.data.range(..&till.ser()?); + let snapshot = store.take_snapshot(); + let mut iter = snapshot.range(..&till.ser()?); if let Some(entry) = iter.next_back() { let pair = entry?; + drop(iter); + drop(snapshot); let key = StoreKey::des(&pair.0)?; if let StoreKey::Metadata(last) = key { let metadata = MetadataValue::des(&pair.1)?; @@ -1277,7 +1290,7 @@ impl RelishStore { Ok(self.meta.as_mut().unwrap()) } - fn _unload_metadata(&mut self) { + fn _unget_metadata(&mut self) { self.meta = None; } } @@ -1302,12 +1315,13 @@ impl BufferedTimeline { conf, timelineid, tenantid, - store: RwLock::new(RelishStore { - data: ToastStore::new(&path)?, + store: ToastStore::new(&path)?, + inner: RwLock::new(BufferedTimelineInner { meta: None, brin: BTreeMap::new(), last_checkpoint: Lsn(0), last_gc: Lsn(0), + last_commit: Lsn(0), }), walredo_mgr, @@ -1369,14 +1383,16 @@ impl BufferedTimeline { }) .ser()?; // Lsn::MAX tranforms inclusive boundary to exclusive - let store = self.store.read().unwrap(); // Iterate through relish in reverse order (to locae last version) loop { // Use exclusive boundary for till to be able to skip to previous relish - let mut iter = store.data.range(&from..&till); + let snapshot = self.store.take_snapshot(); + let mut iter = snapshot.range(&from..&till); if let Some(entry) = iter.next_back() { // locate last version let pair = entry?; + drop(iter); + drop(snapshot); let key = StoreKey::des(&pair.0)?; if let StoreKey::Metadata(mk) = key { if mk.lsn <= lsn { @@ -1395,10 +1411,13 @@ impl BufferedTimeline { .ser()?; let till = StoreKey::Metadata(MetadataKey { rel: mk.rel, lsn }).ser()?; - let mut iter = store.data.range(&from..=&till); + let snapshot = self.store.take_snapshot(); + let mut iter = snapshot.range(&from..=&till); if let Some(entry) = iter.next_back() { // locate visible version let pair = entry?; + drop(iter); + drop(snapshot); let key = StoreKey::des(&pair.0)?; if let StoreKey::Metadata(mk) = key { let meta = MetadataValue::des(&pair.1)?; @@ -1443,19 +1462,24 @@ impl BufferedTimeline { } fn make_snapshot(&self) -> Result<()> { - let store = self.store.read().unwrap(); + let inner = self.inner.read(); let now = Instant::now(); - if let Some(meta_hash) = &store.meta { + if let Some(meta_hash) = &inner.meta { + // Create copy of relation map to avoid holding lock in inner for a long time + let snapshot: Vec<(RelishTag, u32)> = meta_hash + .iter() + .map(|(rel, meta)| (*rel, meta.size)) + .collect(); + drop(inner); let lsn = self.get_last_record_lsn(); - for (rel, snap) in meta_hash.iter() { - let rel_size = snap.size; - for segno in 0..(rel_size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE { + for (rel, size) in snapshot { + for segno in 0..(size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE { let first_blknum = segno * RELISH_SEG_SIZE; - let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, rel_size); + let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, size); let images: Result> = (first_blknum..last_blknum) - .map(|blknum| self.get_page_at_lsn(*rel, blknum, lsn)) + .map(|blknum| self.get_page_at_lsn(rel, blknum, lsn)) .collect(); - let segtag = SegmentTag::from_blknum(*rel, first_blknum); + let segtag = SegmentTag::from_blknum(rel, first_blknum); ImageLayer::create( self.conf, self.timelineid, @@ -1505,14 +1529,13 @@ impl BufferedTimeline { let mut n_checkpointed_records = 0; let last_checkpoint; { - let mut store = self.store.write().unwrap(); - last_checkpoint = store.last_checkpoint; - store.last_checkpoint = self.get_last_record_lsn(); + let mut inner = self.inner.write(); + last_checkpoint = inner.last_checkpoint; + inner.last_checkpoint = self.get_last_record_lsn(); } 'outer: loop { - let store = self.store.read().unwrap(); - - let mut iter = store.data.range(&from..&till); + let snapshot = self.store.take_snapshot(); + let mut iter = snapshot.range(&from..&till); if let Some(entry) = iter.next_back() { let pair = entry?; let key = pair.0; @@ -1523,26 +1546,29 @@ impl BufferedTimeline { seg: dk.blknum / BRIN_SEGMENT_SIZE, }; // At first iteration we need to scan the whole storage, because BRIN does't have enough information - if last_checkpoint != Lsn(0) - && store - .brin - .get(&seg_tag) - .map_or(true, |lsn| *lsn <= last_checkpoint) { - // This segment was not update since last checkpoint: jump to next one - let mut iter = store.brin.range(..seg_tag); - while let Some((next_seg, lsn)) = iter.next_back() { - if *lsn > last_checkpoint { - till = StoreKey::Data(DataKey { - rel: next_seg.rel, - blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE, - lsn: Lsn(0), - }) - .ser()?; - continue 'outer; + let inner = self.inner.read(); + if last_checkpoint != Lsn(0) + && inner + .brin + .get(&seg_tag) + .map_or(true, |lsn| *lsn <= last_checkpoint) + { + // This segment was not update since last checkpoint: jump to next one + let mut iter = inner.brin.range(..seg_tag); + while let Some((next_seg, lsn)) = iter.next_back() { + if *lsn > last_checkpoint { + till = StoreKey::Data(DataKey { + rel: next_seg.rel, + blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE, + lsn: Lsn(0), + }) + .ser()?; + continue 'outer; + } } + break; } - break; } let ver = PageVersion::des(&pair.1)?; if let PageVersion::Wal(rec) = ver { @@ -1582,17 +1608,15 @@ impl BufferedTimeline { bail!("Base image not found for relish {} at {}", dk.rel, dk.lsn); } } - // release locks and reconstruct page withut blocking storage - drop(iter); - drop(store); // See comment above. May be we should also enforce here checkpointing of too old versions. + drop(iter); + drop(snapshot); if history_len as u64 >= reconstruct_threshold { let img = RECONSTRUCT_TIME.observe_closure_duration(|| { self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data) }); - let mut store = self.store.write().unwrap(); - store.data.put(key, PageVersion::Page(img?).ser()?)?; + self.store.put(key, PageVersion::Page(img?).ser()?)?; n_checkpointed_records += 1; } } @@ -1682,14 +1706,14 @@ impl BufferedTimeline { let last_gc; { - let mut store = self.store.write().unwrap(); - last_gc = store.last_gc; - store.last_gc = self.get_last_record_lsn(); + let mut inner = self.inner.write(); + last_gc = inner.last_gc; + inner.last_gc = self.get_last_record_lsn(); } 'meta: loop { - let store = self.store.read().unwrap(); - let iter = store.data.range(&from.ser()?..); + let snapshot = self.store.take_snapshot(); + let iter = snapshot.range(&from.ser()?..); // We can not remove deteriorated version immediately, we need to check first that successor exists let mut last_key: Option = None; @@ -1716,9 +1740,8 @@ impl BufferedTimeline { // If we are still on the same relish... if same_rel { // then drop previus version as it is not needed any more - drop(store); - let mut store = self.store.write().unwrap(); - store.data.remove(prev_key)?; + drop(snapshot); + self.store.remove(prev_key)?; result.meta_removed += 1; // We should reset iterator and start from the current point continue 'meta; @@ -1727,9 +1750,8 @@ impl BufferedTimeline { let meta = MetadataValue::des(&pair.1)?; if meta.size.is_none() { // object was dropped, so we can immediately remove deteriorated version - drop(store); - let mut store = self.store.write().unwrap(); - store.data.remove(raw_key)?; + drop(snapshot); + self.store.remove(raw_key)?; dropped.insert(dk.rel); result.meta_dropped += 1; // We should reset iterator and start from the current point @@ -1764,8 +1786,8 @@ impl BufferedTimeline { // currently proceed block number let mut from_blknum = 0; 'pages: loop { - let store = self.store.read().unwrap(); - let iter = store.data.range(&from.ser()?..); + let snapshot = self.store.take_snapshot(); + let iter = snapshot.range(&from.ser()?..); // Array to accumulate keys we can remove. let mut deteriorated: Vec = Vec::new(); for entry in iter { @@ -1779,22 +1801,25 @@ impl BufferedTimeline { }; // At first iteration we need to scan the whole storage, // because BRIN does't have enough information - if last_gc != Lsn(0) - && store.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc) { - // This segment was not update since last GC: jump to next one - let mut iter = store.brin.range((Excluded(seg_tag), Unbounded)); - while let Some((next_seg, lsn)) = iter.next_back() { - if *lsn > last_gc { - from = StoreKey::Data(DataKey { - rel: next_seg.rel, - blknum: next_seg.seg * BRIN_SEGMENT_SIZE, - lsn: Lsn(0), - }); - continue 'pages; + let inner = self.inner.read(); + if last_gc != Lsn(0) + && inner.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc) + { + // This segment was not update since last GC: jump to next one + let mut iter = inner.brin.range((Excluded(seg_tag), Unbounded)); + while let Some((next_seg, lsn)) = iter.next() { + if *lsn > last_gc { + from = StoreKey::Data(DataKey { + rel: next_seg.rel, + blknum: next_seg.seg * BRIN_SEGMENT_SIZE, + lsn: Lsn(0), + }); + continue 'pages; + } } + break; } - break; } let same_page = from_rel == dk.rel && from_blknum == dk.blknum; if !same_page { @@ -1817,11 +1842,10 @@ impl BufferedTimeline { let ver = PageVersion::des(&pair.1)?; if let PageVersion::Page(_) = ver { // ... then remove all previously accumulated deltas and images, as them are not needed any more - drop(store); - let mut store = self.store.write().unwrap(); result.pages_removed += deteriorated.len() as u64; + drop(snapshot); for key in deteriorated { - store.data.remove(key)?; + self.store.remove(key)?; } // We should reset iterator and start from the current point continue 'pages; @@ -1835,10 +1859,9 @@ impl BufferedTimeline { // This relations was dropped beyond PITR interval: // we can remove all its pages assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation - drop(store); - let mut store = self.store.write().unwrap(); - // We should reset iterator and start from the current point - store.data.remove(raw_key)?; + // We should reset iterator and start from the current point + drop(snapshot); + self.store.remove(raw_key)?; result.pages_dropped += 1; continue 'pages; } @@ -1956,9 +1979,9 @@ impl<'a> BufferedTimelineWriter<'a> { } ensure!(lsn.is_aligned(), "unaligned record LSN"); let key = StoreKey::Data(DataKey { rel, blknum, lsn }); - let mut store = self.tl.store.write().unwrap(); - store.data.put(key.ser()?, ver.ser()?)?; - store.brin.insert( + self.tl.store.put(key.ser()?, ver.ser()?)?; + let mut inner = self.tl.inner.write(); + inner.brin.insert( BrinTag { rel, seg: blknum / BRIN_SEGMENT_SIZE, @@ -1967,7 +1990,7 @@ impl<'a> BufferedTimelineWriter<'a> { ); // Update metadata - let meta_hash = store.load_metadata()?; + let meta_hash = inner.get_metadata(&self.tl.store)?; let rel_size = meta_hash.get(&rel).map(|m| m.size).unwrap_or(0); if rel_size <= blknum { meta_hash.insert( @@ -1977,11 +2000,13 @@ impl<'a> BufferedTimelineWriter<'a> { lsn, }, ); + drop(inner); let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: Some(blknum + 1), }; - store.data.put(mk.ser()?, mv.ser()?)?; + self.tl.store.put(mk.ser()?, mv.ser()?)?; + inner = self.tl.inner.write(); /* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when * page in accessed before been wal logged // Fill gap with zero pages @@ -1997,8 +2022,10 @@ impl<'a> BufferedTimelineWriter<'a> { } */ } - if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn { - store.data.commit(lsn)?; + if inner.last_commit + self.tl.conf.checkpoint_distance < lsn { + inner.last_commit = lsn; + drop(inner); + self.tl.store.commit()?; self.tl.disk_consistent_lsn.store(lsn); } Ok(()) @@ -2073,26 +2100,30 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); - let mut store = self.tl.store.write().unwrap(); - let meta_hash = store.load_metadata()?; - meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn }); let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: Some(relsize), }; - store.data.put(mk.ser()?, mv.ser()?)?; + self.tl.store.put(mk.ser()?, mv.ser()?)?; + + let mut inner = self.tl.inner.write(); + let meta_hash = inner.get_metadata(&self.tl.store)?; + meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn }); + Ok(()) } fn drop_relish(&self, rel: RelishTag, lsn: Lsn) -> Result<()> { trace!("drop_segment: {} at {}", rel, lsn); - let mut store = self.tl.store.write().unwrap(); - let meta_hash = store.load_metadata()?; - meta_hash.remove(&rel); let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: None }; // None indicates dropped relation - store.data.put(mk.ser()?, mv.ser()?)?; + self.tl.store.put(mk.ser()?, mv.ser()?)?; + + let mut inner = self.tl.inner.write(); + let meta_hash = inner.get_metadata(&self.tl.store)?; + meta_hash.remove(&rel); + Ok(()) } @@ -2100,9 +2131,9 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { /// Complete all delayed commits and advance disk_consistent_lsn /// fn checkpoint(&self) -> Result<()> { - let mut store = self.tl.store.write().unwrap(); let lsn = self.tl.get_last_record_lsn(); - store.data.commit(lsn)?; + self.tl.store.commit()?; + self.tl.inner.write().last_commit = lsn; self.tl.disk_consistent_lsn.store(lsn); Ok(()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4a89bc24b4..4306af15fb 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024; - pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250); + pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(3600); pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 41edcee164..1aad1c7d3e 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -425,11 +425,20 @@ pub fn save_decoded_record( relnode: blk.rnode_relnode, forknum: blk.forknum as u8, }); + + // + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // if blk.apply_image && blk.has_image && decoded.xl_rmid == pg_constants::RM_XLOG_ID && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 { // Extract page image from FPI record let img_len = blk.bimg_len as usize; @@ -437,9 +446,6 @@ pub fn save_decoded_record( let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); - // Compression of WAL is not yet supported - assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0); - if blk.hole_length != 0 { let tail = image.split_off(blk.hole_offset as usize); image.resize(image.len() + blk.hole_length as usize, 0u8); diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index e874ba5090..46ad00343f 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -3,9 +3,10 @@ use lz4_flex; use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; -use zenith_utils::lsn::Lsn; -use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value}; +use yakv::storage::{ + Key, ReadOnlyTransaction, Storage, StorageConfig, StorageIterator, Transaction, Value, +}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb @@ -18,21 +19,62 @@ const CACHE_SIZE: usize = 32 * 1024; // 256Mb /// data locality and reduce key size for TOAST segments. /// pub struct ToastStore { - db: Storage, // key-value database - pub commit_lsn: Lsn, // LSN of last committed transaction + db: Storage, // key-value database } pub struct ToastIterator<'a> { iter: StorageIterator<'a>, } +pub struct ToastSnapshot<'a> { + tx: ReadOnlyTransaction<'a>, +} + +impl<'a> ToastSnapshot<'a> { + pub fn range>(&self, range: R) -> ToastIterator<'_> { + let from = match range.start_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8; 4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0u8; 4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded, + }; + let till = match range.end_bound() { + Bound::Included(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8; 4]); + Bound::Included(key) + } + Bound::Excluded(key) => { + let mut key = key.clone(); + key.extend_from_slice(&[0xFFu8; 4]); + Bound::Excluded(key) + } + _ => Bound::Unbounded, + }; + ToastIterator { + iter: self.tx.range((from, till)), + } + } + + pub fn iter(&self) -> ToastIterator<'_> { + self.range(..) + } +} + impl<'a> Iterator for ToastIterator<'a> { type Item = Result<(Key, Value)>; fn next(&mut self) -> Option { let mut toast: Option> = None; let mut next_segno = 0u16; for elem in &mut self.iter { - if let Ok((key, value)) = elem { + let res = if let Ok((key, value)) = elem { let key_len = key.len(); let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap()); @@ -46,20 +88,22 @@ impl<'a> Iterator for ToastIterator<'a> { } toast.as_mut().unwrap().extend_from_slice(&value); next_segno = segno + 1; - if next_segno == n_segments { - let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); - return Some(if let Ok(decompressed_data) = res { - Ok((key, decompressed_data)) - } else { - Err(anyhow!(res.unwrap_err())) - }); + if next_segno != n_segments { + continue; + } + let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + if let Ok(decompressed_data) = res { + Ok((key, decompressed_data)) + } else { + Err(anyhow!(res.unwrap_err())) } } else { - return Some(Ok((key, value))); + Ok((key, value)) } } else { - return Some(elem); - } + elem + }; + return Some(res); } assert_eq!(next_segno, 0); None @@ -125,14 +169,15 @@ impl ToastStore { StorageConfig { cache_size: CACHE_SIZE, nosync: false, + mursiw: true, }, )?, - commit_lsn: Lsn(0), }) } - pub fn put(&mut self, key: Key, value: Value) -> Result<()> { + pub fn put(&self, key: Key, value: Value) -> Result<()> { let mut tx = self.db.start_transaction(); + self.tx_remove(&mut tx, &key)?; let value_len = value.len(); let mut key = key; if value_len >= TOAST_SEGMENT_SIZE { @@ -146,7 +191,7 @@ impl ToastStore { key.extend_from_slice(&n_segments.to_be_bytes()); key.extend_from_slice(&[0u8; 2]); let key_len = key.len(); - while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { + while offs + TOAST_SEGMENT_SIZE < compressed_data_len { key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); tx.put( &key, @@ -155,10 +200,8 @@ impl ToastStore { offs += TOAST_SEGMENT_SIZE; segno += 1; } - if offs < compressed_data_len { - key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); - tx.put(&key, &compressed_data[offs..].to_vec())?; - } + key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); + tx.put(&key, &compressed_data[offs..].to_vec())?; } else { key.extend_from_slice(&[0u8; 4]); tx.put(&key, &value)?; @@ -167,53 +210,27 @@ impl ToastStore { Ok(()) } - pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> { - let mut tx = self.db.start_transaction(); + pub fn commit(&self) -> Result<()> { + let tx = self.db.start_transaction(); tx.commit()?; - self.commit_lsn = commit_lsn; Ok(()) } - pub fn iter(&self) -> ToastIterator<'_> { - self.range(..) - } - - pub fn range>(&self, range: R) -> ToastIterator<'_> { - let from = match range.start_bound() { - Bound::Included(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0u8; 4]); - Bound::Included(key) - } - Bound::Excluded(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0u8; 4]); - Bound::Excluded(key) - } - _ => Bound::Unbounded, - }; - let till = match range.end_bound() { - Bound::Included(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0xFFu8; 4]); - Bound::Included(key) - } - Bound::Excluded(key) => { - let mut key = key.clone(); - key.extend_from_slice(&[0xFFu8; 4]); - Bound::Excluded(key) - } - _ => Bound::Unbounded, - }; - ToastIterator { - iter: self.db.range((from, till)), + pub fn take_snapshot(&self) -> ToastSnapshot<'_> { + ToastSnapshot { + tx: self.db.read_only_transaction(), } } - pub fn remove(&mut self, key: Key) -> Result<()> { + pub fn remove(&self, key: Key) -> Result<()> { let mut tx = self.db.start_transaction(); + self.tx_remove(&mut tx, &key)?; + tx.delay() + } + + pub fn tx_remove(&self, tx: &mut Transaction, key: &Key) -> Result<()> { let mut min_key = key.clone(); - let mut max_key = key; + let mut max_key = key.clone(); min_key.extend_from_slice(&[0u8; 4]); max_key.extend_from_slice(&[0xFFu8; 4]); let mut iter = tx.range(&min_key..&max_key); @@ -231,7 +248,10 @@ impl ToastStore { tx.remove(&key)?; } } - tx.delay()?; Ok(()) } + + pub fn size(&self) -> u64 { + self.db.get_database_info().db_used + } }