mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Add BRIN index for checkpointer
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2576,7 +2576,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"crc32c",
|
||||
"fs2",
|
||||
"lz4_flex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -18,8 +18,7 @@ use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
||||
use std::convert::TryInto;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
@@ -167,12 +166,26 @@ struct MetadataSnapshot {
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
//
|
||||
// Maintain in-memory map <religh,segment> => LSN to be able to fast locate
|
||||
// without scanning all storage recently updated versions which need to be materialized
|
||||
//
|
||||
const BRIN_SEGMENT_SIZE: u32 = 128; // 1Mb
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq)]
|
||||
struct BrinTag {
|
||||
rel: RelishTag,
|
||||
seg: u32,
|
||||
}
|
||||
|
||||
//
|
||||
// Relish store consists of persistent KV store and transient metadata cache loadedon demand
|
||||
//
|
||||
struct RelishStore {
|
||||
data: ToastStore,
|
||||
meta: Option<HashMap<RelishTag, MetadataSnapshot>>,
|
||||
brin: BTreeMap<BrinTag, Lsn>,
|
||||
last_checkpoint: Lsn,
|
||||
}
|
||||
|
||||
/// Public interface
|
||||
@@ -1236,6 +1249,8 @@ impl BufferedTimeline {
|
||||
store: RwLock::new(RelishStore {
|
||||
data: ToastStore::new(&path)?,
|
||||
meta: None,
|
||||
brin: BTreeMap::new(),
|
||||
last_checkpoint: Lsn(0),
|
||||
}),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -1385,7 +1400,13 @@ impl BufferedTimeline {
|
||||
.ser()?; // this MAX values allows to use this boundary as exclusive
|
||||
|
||||
let mut n_checkpointed_records = 0;
|
||||
loop {
|
||||
let last_checkpoint;
|
||||
{
|
||||
let mut store = self.store.write().unwrap();
|
||||
last_checkpoint = store.last_checkpoint;
|
||||
store.last_checkpoint = self.get_last_record_lsn();
|
||||
}
|
||||
'outer: loop {
|
||||
let store = self.store.read().unwrap();
|
||||
|
||||
let mut iter = store.data.range(&from..&till);
|
||||
@@ -1394,6 +1415,32 @@ impl BufferedTimeline {
|
||||
let key = pair.0;
|
||||
debug_assert!(key < till);
|
||||
if let StoreKey::Data(dk) = StoreKey::des(&key)? {
|
||||
let seg_tag = BrinTag {
|
||||
rel: dk.rel,
|
||||
seg: dk.blknum / BRIN_SEGMENT_SIZE,
|
||||
};
|
||||
// At first iteratiob we need to scan the whole storage, because BRIN does't have enough information
|
||||
if last_checkpoint != Lsn(0)
|
||||
&& store
|
||||
.brin
|
||||
.get(&seg_tag)
|
||||
.map_or(true, |lsn| *lsn <= last_checkpoint)
|
||||
{
|
||||
// This segment was not update since last checkpoint: jump to next one
|
||||
let mut iter = store.brin.range(..seg_tag);
|
||||
while let Some((next_seg, lsn)) = iter.next_back() {
|
||||
if *lsn > last_checkpoint {
|
||||
till = StoreKey::Data(DataKey {
|
||||
rel: next_seg.rel,
|
||||
blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE,
|
||||
lsn: Lsn(0),
|
||||
})
|
||||
.ser()?;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
let ver = PageVersion::des(&pair.1)?;
|
||||
if let PageVersion::Wal(rec) = ver {
|
||||
// ignore already materialized pages
|
||||
@@ -1780,6 +1827,13 @@ impl<'a> BufferedTimelineWriter<'a> {
|
||||
let key = StoreKey::Data(DataKey { rel, blknum, lsn });
|
||||
let mut store = self.tl.store.write().unwrap();
|
||||
store.data.put(&key.ser()?, &ver.ser()?)?;
|
||||
store.brin.insert(
|
||||
BrinTag {
|
||||
rel,
|
||||
seg: blknum / BRIN_SEGMENT_SIZE,
|
||||
},
|
||||
lsn,
|
||||
);
|
||||
|
||||
// Update metadata
|
||||
let meta_hash = store.load_metadata()?;
|
||||
|
||||
@@ -36,8 +36,8 @@ pub mod defaults {
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 1024*1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000);
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
|
||||
|
||||
@@ -126,7 +126,7 @@ impl ToastStore {
|
||||
Ok(ToastStore {
|
||||
db: Storage::open(
|
||||
&path.join("pageserver.db"),
|
||||
Some(&path.join("pageserver.log")),
|
||||
None, // Some(&path.join("pageserver.log")),
|
||||
StorageConfig {
|
||||
cache_size: CACHE_SIZE,
|
||||
checkpoint_interval: CHECKPOINT_INTERVAL,
|
||||
|
||||
Reference in New Issue
Block a user