From f73d043a8bb1ae104364bfe714f386a225615cc6 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 22 Nov 2021 19:22:39 +0300 Subject: [PATCH] Use COW version of YAKV --- Cargo.lock | 3 +-- control_plane/src/compute.rs | 2 +- pageserver/Cargo.toml | 4 ++-- pageserver/src/bin/pageserver.rs | 19 +++++++++++++++ pageserver/src/buffered_repository.rs | 31 ++++++++++--------------- pageserver/src/lib.rs | 10 +++++--- pageserver/src/toast_store.rs | 33 ++++++++------------------- 7 files changed, 52 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7560ef98e5..f842c55a12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,10 +2571,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", - "crc32c", "fs2", ] diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 63d6d7c014..d9027f31f6 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -294,7 +294,7 @@ impl PostgresNode { conf.append("max_wal_senders", "10"); // wal_log_hints is mandatory when running against pageserver (see gh issue#192) // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? - // conf.append("wal_log_hints", "on"); + conf.append("wal_log_hints", "on"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); conf.append("shared_buffers", "1MB"); diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6a4387a4d1..52c58587f6 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,8 +37,8 @@ async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } -yakv = { path = "../../yakv" } -#yakv = "0.2.0" +#yakv = { path = "../../yakv" } +yakv = "0.2.1" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 3a577476dc..4020de04e9 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -42,6 +42,7 @@ struct CfgFileParams { listen_http_addr: Option, checkpoint_distance: Option, checkpoint_period: Option, + reconstruct_threshold: Option, gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, @@ -103,6 +104,7 @@ impl CfgFileParams { listen_http_addr: get_arg("listen-http"), checkpoint_distance: get_arg("checkpoint_distance"), checkpoint_period: get_arg("checkpoint_period"), + reconstruct_threshold: get_arg("reconstruct_threshold"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), @@ -121,6 +123,7 @@ impl CfgFileParams { listen_http_addr: self.listen_http_addr.or(other.listen_http_addr), checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance), checkpoint_period: self.checkpoint_period.or(other.checkpoint_period), + reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), @@ -158,6 +161,11 @@ impl CfgFileParams { None => DEFAULT_CHECKPOINT_PERIOD, }; + let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() { + Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?, + None => DEFAULT_RECONSTRUCT_THRESHOLD, + }; + let gc_horizon: u64 = match self.gc_horizon.as_ref() { Some(horizon_str) => horizon_str.parse()?, None => DEFAULT_GC_HORIZON, @@ -236,6 +244,7 @@ impl CfgFileParams { listen_http_addr, checkpoint_distance, checkpoint_period, + reconstruct_threshold, gc_horizon, gc_period, @@ -296,6 +305,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Interval between checkpoint iterations"), ) + .arg( + Arg::with_name("reconstruct_threshold") + .long("reconstruct_threshold") + .takes_value(true) + .help("Minimal size of deltas after which page reconstruction (materialization) can be performed"), + ) .arg( Arg::with_name("gc_horizon") .long("gc_horizon") @@ -600,6 +615,7 @@ mod tests { listen_http_addr: Some("listen_http_addr_VALUE".to_string()), checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()), checkpoint_period: Some("checkpoint_period_VALUE".to_string()), + reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), @@ -623,6 +639,7 @@ mod tests { listen_http_addr = 'listen_http_addr_VALUE' checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' +reconstruct_threshold = 'reconstruct_threshold_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' @@ -657,6 +674,7 @@ local_path = 'relish_storage_local_VALUE' listen_http_addr: Some("listen_http_addr_VALUE".to_string()), checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()), checkpoint_period: Some("checkpoint_period_VALUE".to_string()), + reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), @@ -683,6 +701,7 @@ local_path = 'relish_storage_local_VALUE' listen_http_addr = 'listen_http_addr_VALUE' checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' +reconstruct_threshold = 'reconstruct_threshold_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 5a3738f362..18fb437e95 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -400,8 +400,6 @@ impl BufferedRepository { std::thread::sleep(conf.checkpoint_period); info!("checkpointer thread for tenant {} waking up", self.tenantid); - // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE - // bytes of WAL since last checkpoint. { let timelines: Vec<(ZTimelineId, Arc)> = self .timelines @@ -419,7 +417,7 @@ impl BufferedRepository { STORAGE_TIME .with_label_values(&["checkpoint_timed"]) .observe_closure_duration(|| { - timeline.checkpoint_internal(conf.checkpoint_distance, false) + timeline.checkpoint_internal(conf.reconstruct_threshold, false) })? } // release lock on 'timelines' @@ -933,7 +931,7 @@ impl Timeline for BufferedTimeline { fn checkpoint(&self) -> Result<()> { STORAGE_TIME .with_label_values(&["checkpoint_force"]) - //pass checkpoint_distance=0 to force checkpoint + //pass resonstruct_threshold=0 to force page materialization .observe_closure_duration(|| self.checkpoint_internal(0, true)) } @@ -1375,12 +1373,12 @@ impl BufferedTimeline { /// Matrialize last page versions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. - /// checkpoint_interval is used to measure total length of applied WAL records. + /// reconstruct_threshold is used to measure total length of applied WAL records. /// It can be used to prevent to frequent materialization of page. We can avoid store materialized page if history of changes is not so long /// and can be fast replayed. Alternatively we can measure interval from last version LSN: /// it will enforce materialization of "stabilized" pages. But there is a risk that permanently updated page will never be materialized. /// - fn checkpoint_internal(&self, checkpoint_distance: u64, _forced: bool) -> Result<()> { + fn checkpoint_internal(&self, reconstruct_threshold: u64, _forced: bool) -> Result<()> { // From boundary is constant and till boundary is changed at each iteration. let from = StoreKey::Data(DataKey { rel: RelishTag::Relation(ZERO_TAG), @@ -1485,7 +1483,7 @@ impl BufferedTimeline { drop(iter); drop(store); // See comment above. May be we should also enforce here checkpointing of too old versions. - if history_len as u64 >= checkpoint_distance { + if history_len as u64 >= reconstruct_threshold { let img = RECONSTRUCT_TIME.observe_closure_duration(|| { self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data) }); @@ -1898,8 +1896,9 @@ impl<'a> BufferedTimelineWriter<'a> { } */ } - if store.data.committed { - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk + if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn { + store.data.commit(lsn)?; + self.tl.disk_consistent_lsn.store(lsn); } Ok(()) } @@ -1981,9 +1980,6 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { size: Some(relsize), }; store.data.put(&mk.ser()?, &mv.ser()?)?; - if store.data.committed { - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk - } Ok(()) } @@ -1996,10 +1992,6 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { let mk = StoreKey::Metadata(MetadataKey { rel, lsn }); let mv = MetadataValue { size: None }; // None indicates dropped relation store.data.put(&mk.ser()?, &mv.ser()?)?; - - if store.data.committed { - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk - } Ok(()) } @@ -2007,11 +1999,12 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { /// Complete all delayed commits and advance disk_consistent_lsn /// fn checkpoint(&self) -> Result<()> { - let store = self.tl.store.write().unwrap(); - store.data.checkpoint()?; + let mut store = self.tl.store.write().unwrap(); + let lsn = self.tl.get_last_record_lsn(); + store.data.commit(lsn)?; self.tl .disk_consistent_lsn - .store(self.tl.get_last_record_lsn()); + .store(lsn); Ok(()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c5200c909d..da5045657f 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -33,11 +33,13 @@ pub mod defaults { pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); // Minimal size of WAL records chain to trigger materialization of the page - pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); + + pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0; pub const DEFAULT_GC_HORIZON: u64 = 1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; @@ -64,6 +66,7 @@ pub struct PageServerConf { // page server crashes. pub checkpoint_distance: u64, pub checkpoint_period: Duration, + pub reconstruct_threshold: u64, pub gc_horizon: u64, pub gc_period: Duration, @@ -149,6 +152,7 @@ impl PageServerConf { daemonize: false, checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, checkpoint_period: Duration::from_secs(10), + reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD, gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index 6c6d0f9ef1..a18e714776 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -4,13 +4,13 @@ use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; use tracing::*; +use zenith_utils::lsn::Lsn; + use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; -const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb -const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2; -const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb +//const CACHE_SIZE: usize = 128 * 1024; // 1Gb /// /// Toast storage consistof two KV databases: one for storing main index @@ -20,7 +20,7 @@ const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb /// pub struct ToastStore { db: Storage, // key-value database - pub committed: bool, // last transaction was committed (not delayed) + pub commit_lsn: Lsn, // LSN of last committed transaction } pub struct ToastIterator<'a> { @@ -126,14 +126,12 @@ impl ToastStore { Ok(ToastStore { db: Storage::open( &path.join("pageserver.db"), - Some(&path.join("pageserver.log")), StorageConfig { cache_size: CACHE_SIZE, - checkpoint_interval: CHECKPOINT_INTERVAL, - wal_flush_threshold: WAL_FLUSH_THRESHOLD, + nosync: false, }, )?, - committed: false, + commit_lsn: Lsn(0), }) } @@ -141,7 +139,6 @@ impl ToastStore { let mut tx = self.db.start_transaction(); let value_len = value.len(); let mut key = key.clone(); - self.committed = false; if value_len >= TOAST_SEGMENT_SIZE { let compressed_data = lz4_flex::compress_prepend_size(value); let compressed_data_len = compressed_data.len(); @@ -170,18 +167,14 @@ impl ToastStore { key.extend_from_slice(&[0u8; 4]); tx.put(&key, value)?; } - if tx.get_cache_info().pinned > COMMIT_THRESHOLD { - tx.commit()?; - self.committed = true; - } else { - tx.delay()?; - } + tx.delay()?; Ok(()) } - pub fn checkpoint(&self) -> Result<()> { + pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> { let mut tx = self.db.start_transaction(); tx.commit()?; + self.commit_lsn = commit_lsn; Ok(()) } @@ -228,7 +221,6 @@ impl ToastStore { min_key.extend_from_slice(&[0u8; 4]); max_key.extend_from_slice(&[0xFFu8; 4]); let mut iter = tx.range(&min_key..&max_key); - self.committed = false; if let Some(entry) = iter.next() { let mut key = entry?.0.clone(); let key_len = key.len(); @@ -243,12 +235,7 @@ impl ToastStore { tx.remove(&key)?; } } - if tx.get_cache_info().pinned > COMMIT_THRESHOLD { - tx.commit()?; - self.committed = true; - } else { - tx.delay()?; - } + tx.delay()?; Ok(()) } }