Compare commits

...

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3e08ad485a Fix bug in using brin index in GC 2021-11-30 16:33:50 +03:00
5 changed files with 55 additions and 30 deletions

2
Cargo.lock generated
View File

@@ -2571,7 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.1"
version = "0.2.4"
dependencies = [
"anyhow",
"fs2",

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.2.2"
yakv = "0.2.4"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -187,6 +187,7 @@ struct RelishStore {
brin: BTreeMap<BrinTag, Lsn>,
last_checkpoint: Lsn,
last_gc: Lsn,
last_commit: Lsn,
}
/// Public interface
@@ -1308,6 +1309,7 @@ impl BufferedTimeline {
brin: BTreeMap::new(),
last_checkpoint: Lsn(0),
last_gc: Lsn(0),
last_commit: Lsn(0),
}),
walredo_mgr,
@@ -1591,7 +1593,7 @@ impl BufferedTimeline {
self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data)
});
let mut store = self.store.write().unwrap();
let store = self.store.write().unwrap();
store.data.put(key, PageVersion::Page(img?).ser()?)?;
n_checkpointed_records += 1;
}
@@ -1717,7 +1719,7 @@ impl BufferedTimeline {
if same_rel {
// then drop previus version as it is not needed any more
drop(store);
let mut store = self.store.write().unwrap();
let store = self.store.write().unwrap();
store.data.remove(prev_key)?;
result.meta_removed += 1;
// We should reset iterator and start from the current point
@@ -1728,7 +1730,7 @@ impl BufferedTimeline {
if meta.size.is_none() {
// object was dropped, so we can immediately remove deteriorated version
drop(store);
let mut store = self.store.write().unwrap();
let store = self.store.write().unwrap();
store.data.remove(raw_key)?;
dropped.insert(dk.rel);
result.meta_dropped += 1;
@@ -1783,8 +1785,8 @@ impl BufferedTimeline {
&& store.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc)
{
// This segment was not update since last GC: jump to next one
let mut iter = store.brin.range((Excluded(seg_tag), Unbounded));
while let Some((next_seg, lsn)) = iter.next_back() {
let iter = store.brin.range((Excluded(seg_tag), Unbounded));
for (next_seg, lsn) in iter {
if *lsn > last_gc {
from = StoreKey::Data(DataKey {
rel: next_seg.rel,
@@ -1818,7 +1820,7 @@ impl BufferedTimeline {
if let PageVersion::Page(_) = ver {
// ... then remove all previously accumulated deltas and images, as them are not needed any more
drop(store);
let mut store = self.store.write().unwrap();
let store = self.store.write().unwrap();
result.pages_removed += deteriorated.len() as u64;
for key in deteriorated {
store.data.remove(key)?;
@@ -1836,7 +1838,7 @@ impl BufferedTimeline {
// we can remove all its pages
assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation
drop(store);
let mut store = self.store.write().unwrap();
let store = self.store.write().unwrap();
// We should reset iterator and start from the current point
store.data.remove(raw_key)?;
result.pages_dropped += 1;
@@ -1997,8 +1999,9 @@ impl<'a> BufferedTimelineWriter<'a> {
}
*/
}
if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn {
store.data.commit(lsn)?;
if store.last_commit + self.tl.conf.checkpoint_distance < lsn {
store.data.commit()?;
store.last_commit = lsn;
self.tl.disk_consistent_lsn.store(lsn);
}
Ok(())
@@ -2102,7 +2105,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
fn checkpoint(&self) -> Result<()> {
let mut store = self.tl.store.write().unwrap();
let lsn = self.tl.get_last_record_lsn();
store.data.commit(lsn)?;
store.last_commit = lsn;
store.data.commit()?;
self.tl.disk_consistent_lsn.store(lsn);
Ok(())
}

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250);
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(2500);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;

View File

@@ -3,9 +3,8 @@ use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use zenith_utils::lsn::Lsn;
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Transaction, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
@@ -19,13 +18,29 @@ const CACHE_SIZE: usize = 32 * 1024; // 256Mb
///
pub struct ToastStore {
db: Storage, // key-value database
pub commit_lsn: Lsn, // LSN of last committed transaction
}
pub struct ToastIterator<'a> {
iter: StorageIterator<'a>,
}
#[derive(Clone, Copy)]
pub struct PageData {
data: [u8; 8192],
}
impl PageData {
pub fn find_first_zero_bit(&self, offs: usize) -> usize {
let bytes = self.data;
for i in offs..8192 {
if bytes[i] != 0xFFu8 {
return i * 8 + bytes[i].trailing_ones() as usize;
}
}
usize::MAX
}
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
@@ -125,14 +140,15 @@ impl ToastStore {
StorageConfig {
cache_size: CACHE_SIZE,
nosync: false,
mursiw: true,
},
)?,
commit_lsn: Lsn(0),
})
}
pub fn put(&mut self, key: Key, value: Value) -> Result<()> {
pub fn put(&self, key: Key, value: Value) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
let value_len = value.len();
let mut key = key;
if value_len >= TOAST_SEGMENT_SIZE {
@@ -146,7 +162,7 @@ impl ToastStore {
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8; 2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
@@ -155,10 +171,8 @@ impl ToastStore {
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
if offs < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
}
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
} else {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, &value)?;
@@ -167,10 +181,9 @@ impl ToastStore {
Ok(())
}
pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> {
let mut tx = self.db.start_transaction();
pub fn commit(&mut self) -> Result<()> {
let tx = self.db.start_transaction();
tx.commit()?;
self.commit_lsn = commit_lsn;
Ok(())
}
@@ -210,10 +223,15 @@ impl ToastStore {
}
}
pub fn remove(&mut self, key: Key) -> Result<()> {
pub fn remove(&self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
let mut min_key = key.clone();
let mut max_key = key;
self.tx_remove(&mut tx, &key)?;
tx.delay()
}
pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> {
let mut min_key = key.to_vec();
let mut max_key = key.to_vec();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
@@ -231,7 +249,10 @@ impl ToastStore {
tx.remove(&key)?;
}
}
tx.delay()?;
Ok(())
}
pub fn size(&self) -> u64 {
self.db.get_database_info().db_used
}
}