diff --git a/Cargo.lock b/Cargo.lock index 36692be128..5da0e9c179 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2576,7 +2576,6 @@ dependencies = [ "anyhow", "crc32c", "fs2", - "lz4_flex", ] [[package]] diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 46cb7ae805..4bbd61eaff 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -18,8 +18,7 @@ use postgres_ffi::pg_constants::BLCKSZ; use serde::{Deserialize, Serialize}; use tracing::*; -use std::collections::HashMap; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::convert::TryInto; use std::fs; use std::fs::{File, OpenOptions}; @@ -167,12 +166,26 @@ struct MetadataSnapshot { lsn: Lsn, } +// +// Maintain in-memory map => LSN to be able to fast locate +// without scanning all storage recently updated versions which need to be materialized +// +const BRIN_SEGMENT_SIZE: u32 = 128; // 1Mb + +#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq)] +struct BrinTag { + rel: RelishTag, + seg: u32, +} + // // Relish store consists of persistent KV store and transient metadata cache loadedon demand // struct RelishStore { data: ToastStore, meta: Option>, + brin: BTreeMap, + last_checkpoint: Lsn, } /// Public interface @@ -1236,6 +1249,8 @@ impl BufferedTimeline { store: RwLock::new(RelishStore { data: ToastStore::new(&path)?, meta: None, + brin: BTreeMap::new(), + last_checkpoint: Lsn(0), }), walredo_mgr, @@ -1385,7 +1400,13 @@ impl BufferedTimeline { .ser()?; // this MAX values allows to use this boundary as exclusive let mut n_checkpointed_records = 0; - loop { + let last_checkpoint; + { + let mut store = self.store.write().unwrap(); + last_checkpoint = store.last_checkpoint; + store.last_checkpoint = self.get_last_record_lsn(); + } + 'outer: loop { let store = self.store.read().unwrap(); let mut iter = store.data.range(&from..&till); @@ -1394,6 +1415,32 @@ impl BufferedTimeline { let key = pair.0; debug_assert!(key < till); if let StoreKey::Data(dk) = StoreKey::des(&key)? { + let seg_tag = BrinTag { + rel: dk.rel, + seg: dk.blknum / BRIN_SEGMENT_SIZE, + }; + // At first iteratiob we need to scan the whole storage, because BRIN does't have enough information + if last_checkpoint != Lsn(0) + && store + .brin + .get(&seg_tag) + .map_or(true, |lsn| *lsn <= last_checkpoint) + { + // This segment was not update since last checkpoint: jump to next one + let mut iter = store.brin.range(..seg_tag); + while let Some((next_seg, lsn)) = iter.next_back() { + if *lsn > last_checkpoint { + till = StoreKey::Data(DataKey { + rel: next_seg.rel, + blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE, + lsn: Lsn(0), + }) + .ser()?; + continue 'outer; + } + } + break; + } let ver = PageVersion::des(&pair.1)?; if let PageVersion::Wal(rec) = ver { // ignore already materialized pages @@ -1780,6 +1827,13 @@ impl<'a> BufferedTimelineWriter<'a> { let key = StoreKey::Data(DataKey { rel, blknum, lsn }); let mut store = self.tl.store.write().unwrap(); store.data.put(&key.ser()?, &ver.ser()?)?; + store.brin.insert( + BrinTag { + rel, + seg: blknum / BRIN_SEGMENT_SIZE, + }, + lsn, + ); // Update metadata let meta_hash = store.load_metadata()?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index e984f545bd..48be565b27 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -36,8 +36,8 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); - pub const DEFAULT_GC_HORIZON: u64 = 1024*1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_GC_HORIZON: u64 = 1024 * 1024; + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index f9afe117f7..bef7173b42 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -126,7 +126,7 @@ impl ToastStore { Ok(ToastStore { db: Storage::open( &path.join("pageserver.db"), - Some(&path.join("pageserver.log")), + None, // Some(&path.join("pageserver.log")), StorageConfig { cache_size: CACHE_SIZE, checkpoint_interval: CHECKPOINT_INTERVAL,