From cb6e231ea9d8a0ef6614df715bb7848bb7389cba Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 6 Dec 2021 12:24:13 +0300 Subject: [PATCH] Separate checkpointing and page reconstruction --- Cargo.lock | 4 +- pageserver/Cargo.toml | 2 +- pageserver/src/buffered_repository.rs | 118 +++++++++++--------------- pageserver/src/toast_store.rs | 23 +++-- 4 files changed, 71 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5db2133fd6..483c5d535b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2572,9 +2572,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3aba428668b9459f60e48264aa9742f1648d24f6a7dd83c8cc73278797131d8f" +checksum = "17eba1abb31dda774cd901a9692a47aac716050975a30993c79826a08de47a34" dependencies = [ "anyhow", "fs2", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 2c6c826ef9..9c243c6881 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.2.6" +yakv = "0.2.7" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 234d5a8bd5..3e1c5af2ab 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -187,7 +187,6 @@ struct BufferedTimelineInner { brin: BTreeMap, last_checkpoint: Lsn, last_gc: Lsn, - last_commit: Lsn, } /// Public interface @@ -416,9 +415,7 @@ impl BufferedRepository { STORAGE_TIME .with_label_values(&["checkpoint_timed"]) - .observe_closure_duration(|| { - timeline.checkpoint_internal(conf.reconstruct_threshold, false) - })? + .observe_closure_duration(|| timeline.reconstruct_pages())? } // release lock on 'timelines' } @@ -897,12 +894,13 @@ impl Timeline for BufferedTimeline { Ok(ZERO_PAGE.clone()) } }; - /* // TODO: insertion materialized page in storage cause negative impact on performance. - if let Some(key) = reconstruct_key { - if let Ok(img) = &result { - self.store.put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?; - } + /* + // TODO: insertion materialized page in storage cause negative impact on performance. + if let Some(key) = reconstruct_key { + if let Ok(img) = &result { + self.store.put(StoreKey::Data(key).ser()?, PageVersion::Page(img.clone()).ser()?)?; } + } */ result } @@ -985,7 +983,7 @@ impl Timeline for BufferedTimeline { STORAGE_TIME .with_label_values(&["checkpoint_force"]) //pass resonstruct_threshold=0 to force page materialization - .observe_closure_duration(|| self.checkpoint_internal(0, true)) + .observe_closure_duration(|| self.checkpoint_internal()) } /// @@ -1321,7 +1319,6 @@ impl BufferedTimeline { brin: BTreeMap::new(), last_checkpoint: Lsn(0), last_gc: Lsn(0), - last_commit: Lsn(0), }), walredo_mgr, @@ -1497,7 +1494,7 @@ impl BufferedTimeline { } /// - /// Matrialize last page versions + /// Materialize last page versions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. /// reconstruct_threshold is used to measure total length of applied WAL records. @@ -1505,7 +1502,7 @@ impl BufferedTimeline { /// and can be fast replayed. Alternatively we can measure interval from last version LSN: /// it will enforce materialization of "stabilized" pages. But there is a risk that permanently updated page will never be materialized. /// - fn checkpoint_internal(&self, reconstruct_threshold: u64, _forced: bool) -> Result<()> { + fn reconstruct_pages(&self) -> Result<()> { // From boundary is constant and till boundary is changed at each iteration. let from = StoreKey::Data(DataKey { rel: RelishTag::Relation(ZERO_TAG), @@ -1526,7 +1523,7 @@ impl BufferedTimeline { }) .ser()?; // this MAX values allows to use this boundary as exclusive - let mut n_checkpointed_records = 0; + let mut n_reconstructed_pages = 0; let last_checkpoint; { let mut inner = self.inner.write(); @@ -1611,13 +1608,13 @@ impl BufferedTimeline { // See comment above. May be we should also enforce here checkpointing of too old versions. drop(iter); drop(snapshot); - if history_len as u64 >= reconstruct_threshold { + if history_len as u64 >= self.conf.reconstruct_threshold { let img = RECONSTRUCT_TIME.observe_closure_duration(|| { self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data) }); self.store.put(key, PageVersion::Page(img?).ser()?)?; - n_checkpointed_records += 1; + n_reconstructed_pages += 1; } } // Jump to next page. Setting lsn=0 and using it as exclusive boundary allows us to jump to previous page. @@ -1634,34 +1631,41 @@ impl BufferedTimeline { break; } } - let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid); - let ondisk_prev_record_lsn = self.get_prev_record_lsn(); - let metadata = TimelineMetadata { - disk_consistent_lsn: self.disk_consistent_lsn.load(), - prev_record_lsn: Some(ondisk_prev_record_lsn), - ancestor_timeline: ancestor_timelineid, - ancestor_lsn: self.ancestor_lsn, - }; + info!("Reconstruct {} pages", n_reconstructed_pages); + self.checkpoint_internal() + } - BufferedRepository::save_metadata( - self.conf, - self.timelineid, - self.tenantid, - &metadata, - false, - )?; - info!("Checkpoint {} records", n_checkpointed_records); + fn checkpoint_internal(&self) -> Result<()> { + let lsn = self.get_last_record_lsn(); + self.store.commit()?; + if self.disk_consistent_lsn.fetch_max(lsn) == lsn { + let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid); + let ondisk_prev_record_lsn = self.get_prev_record_lsn(); + let metadata = TimelineMetadata { + disk_consistent_lsn: self.disk_consistent_lsn.load(), + prev_record_lsn: Some(ondisk_prev_record_lsn), + ancestor_timeline: ancestor_timelineid, + ancestor_lsn: self.ancestor_lsn, + }; - if self.upload_relishes { - schedule_timeline_upload(()) - // schedule_timeline_upload( - // self.tenantid, - // self.timelineid, - // layer_uploads, - // disk_consistent_lsn, - // }); + BufferedRepository::save_metadata( + self.conf, + self.timelineid, + self.tenantid, + &metadata, + false, + )?; + + if self.upload_relishes { + schedule_timeline_upload(()) + // schedule_timeline_upload( + // self.tenantid, + // self.timelineid, + // layer_uploads, + // disk_consistent_lsn, + // }); + } } - Ok(()) } @@ -1881,7 +1885,7 @@ impl BufferedTimeline { } break; } - + self.checkpoint_internal()?; result.elapsed = now.elapsed(); Ok(result) } @@ -2006,27 +2010,11 @@ impl<'a> BufferedTimelineWriter<'a> { size: Some(blknum + 1), }; self.tl.store.put(mk.ser()?, mv.ser()?)?; - inner = self.tl.inner.write(); - /* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when - * page in accessed before been wal logged - // Fill gap with zero pages - for blk in rel_size..blknum { - let key = StoreKey::Data(DataKey { - rel, - blknum: blk, - lsn, - }); - store - .data - .put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?; - } - */ - } - if inner.last_commit + self.tl.conf.checkpoint_distance < lsn { - inner.last_commit = lsn; + } else { drop(inner); - self.tl.store.commit()?; - self.tl.disk_consistent_lsn.store(lsn); + } + if self.tl.disk_consistent_lsn.load() + self.tl.conf.checkpoint_distance < lsn { + self.tl.checkpoint_internal()? } Ok(()) } @@ -2131,11 +2119,7 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { /// Complete all delayed commits and advance disk_consistent_lsn /// fn checkpoint(&self) -> Result<()> { - let lsn = self.tl.get_last_record_lsn(); - self.tl.store.commit()?; - self.tl.inner.write().last_commit = lsn; - self.tl.disk_consistent_lsn.store(lsn); - Ok(()) + self.tl.checkpoint() } /// diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index f48fbdeceb..abade0e383 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -5,12 +5,19 @@ use std::ops::{Bound, RangeBounds}; use std::path::Path; use yakv::storage::{ - Key, ReadOnlyTransaction, Select, Storage, StorageConfig, StorageIterator, Transaction, Value, + Key, + Select, + // ReadOnlyTransaction, + Snapshot, + Storage, + StorageConfig, + StorageIterator, + Transaction, + Value, }; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb - //const CACHE_SIZE: usize = 128 * 1024; // 1Gb /// /// Toast storage consistof two KV databases: one for storing main index @@ -27,7 +34,8 @@ pub struct ToastIterator<'a> { } pub struct ToastSnapshot<'a> { - tx: ReadOnlyTransaction<'a>, + // tx: ReadOnlyTransaction<'a>, + tx: Snapshot<'a>, } impl<'a> ToastSnapshot<'a> { @@ -205,7 +213,8 @@ impl ToastStore { key.extend_from_slice(&[0u8; 4]); tx.put(&key, &value)?; } - tx.delay(); + tx.subcommit()?; + //tx.delay(); Ok(()) } @@ -217,14 +226,16 @@ impl ToastStore { pub fn take_snapshot(&self) -> ToastSnapshot<'_> { ToastSnapshot { - tx: self.db.read_only_transaction(), + //tx: self.db.read_only_transaction(), + tx: self.db.take_snapshot(), } } pub fn remove(&self, key: Key) -> Result<()> { let mut tx = self.db.start_transaction(); self.tx_remove(&mut tx, &key)?; - tx.delay(); + tx.subcommit()?; + //tx.delay(); Ok(()) }