mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Compare commits
1 Commits
buffered_r
...
buffered_r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e08ad485a |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2571,7 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
|
||||
|
||||
[[package]]
|
||||
name = "yakv"
|
||||
version = "0.2.1"
|
||||
version = "0.2.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"fs2",
|
||||
|
||||
@@ -38,7 +38,7 @@ const_format = "0.2.21"
|
||||
tracing = "0.1.27"
|
||||
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
|
||||
#yakv = { path = "../../yakv" }
|
||||
yakv = "0.2.2"
|
||||
yakv = "0.2.4"
|
||||
lz4_flex = "0.9.0"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
|
||||
@@ -187,6 +187,7 @@ struct RelishStore {
|
||||
brin: BTreeMap<BrinTag, Lsn>,
|
||||
last_checkpoint: Lsn,
|
||||
last_gc: Lsn,
|
||||
last_commit: Lsn,
|
||||
}
|
||||
|
||||
/// Public interface
|
||||
@@ -1308,6 +1309,7 @@ impl BufferedTimeline {
|
||||
brin: BTreeMap::new(),
|
||||
last_checkpoint: Lsn(0),
|
||||
last_gc: Lsn(0),
|
||||
last_commit: Lsn(0),
|
||||
}),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -1591,7 +1593,7 @@ impl BufferedTimeline {
|
||||
self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data)
|
||||
});
|
||||
|
||||
let mut store = self.store.write().unwrap();
|
||||
let store = self.store.write().unwrap();
|
||||
store.data.put(key, PageVersion::Page(img?).ser()?)?;
|
||||
n_checkpointed_records += 1;
|
||||
}
|
||||
@@ -1717,7 +1719,7 @@ impl BufferedTimeline {
|
||||
if same_rel {
|
||||
// then drop previus version as it is not needed any more
|
||||
drop(store);
|
||||
let mut store = self.store.write().unwrap();
|
||||
let store = self.store.write().unwrap();
|
||||
store.data.remove(prev_key)?;
|
||||
result.meta_removed += 1;
|
||||
// We should reset iterator and start from the current point
|
||||
@@ -1728,7 +1730,7 @@ impl BufferedTimeline {
|
||||
if meta.size.is_none() {
|
||||
// object was dropped, so we can immediately remove deteriorated version
|
||||
drop(store);
|
||||
let mut store = self.store.write().unwrap();
|
||||
let store = self.store.write().unwrap();
|
||||
store.data.remove(raw_key)?;
|
||||
dropped.insert(dk.rel);
|
||||
result.meta_dropped += 1;
|
||||
@@ -1783,8 +1785,8 @@ impl BufferedTimeline {
|
||||
&& store.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc)
|
||||
{
|
||||
// This segment was not update since last GC: jump to next one
|
||||
let mut iter = store.brin.range((Excluded(seg_tag), Unbounded));
|
||||
while let Some((next_seg, lsn)) = iter.next_back() {
|
||||
let iter = store.brin.range((Excluded(seg_tag), Unbounded));
|
||||
for (next_seg, lsn) in iter {
|
||||
if *lsn > last_gc {
|
||||
from = StoreKey::Data(DataKey {
|
||||
rel: next_seg.rel,
|
||||
@@ -1818,7 +1820,7 @@ impl BufferedTimeline {
|
||||
if let PageVersion::Page(_) = ver {
|
||||
// ... then remove all previously accumulated deltas and images, as them are not needed any more
|
||||
drop(store);
|
||||
let mut store = self.store.write().unwrap();
|
||||
let store = self.store.write().unwrap();
|
||||
result.pages_removed += deteriorated.len() as u64;
|
||||
for key in deteriorated {
|
||||
store.data.remove(key)?;
|
||||
@@ -1836,7 +1838,7 @@ impl BufferedTimeline {
|
||||
// we can remove all its pages
|
||||
assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation
|
||||
drop(store);
|
||||
let mut store = self.store.write().unwrap();
|
||||
let store = self.store.write().unwrap();
|
||||
// We should reset iterator and start from the current point
|
||||
store.data.remove(raw_key)?;
|
||||
result.pages_dropped += 1;
|
||||
@@ -1997,8 +1999,9 @@ impl<'a> BufferedTimelineWriter<'a> {
|
||||
}
|
||||
*/
|
||||
}
|
||||
if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn {
|
||||
store.data.commit(lsn)?;
|
||||
if store.last_commit + self.tl.conf.checkpoint_distance < lsn {
|
||||
store.data.commit()?;
|
||||
store.last_commit = lsn;
|
||||
self.tl.disk_consistent_lsn.store(lsn);
|
||||
}
|
||||
Ok(())
|
||||
@@ -2102,7 +2105,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
|
||||
fn checkpoint(&self) -> Result<()> {
|
||||
let mut store = self.tl.store.write().unwrap();
|
||||
let lsn = self.tl.get_last_record_lsn();
|
||||
store.data.commit(lsn)?;
|
||||
store.last_commit = lsn;
|
||||
store.data.commit()?;
|
||||
self.tl.disk_consistent_lsn.store(lsn);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
|
||||
|
||||
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250);
|
||||
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(2500);
|
||||
|
||||
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
|
||||
|
||||
|
||||
@@ -3,9 +3,8 @@ use lz4_flex;
|
||||
use std::convert::TryInto;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::path::Path;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
|
||||
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Transaction, Value};
|
||||
|
||||
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
|
||||
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
|
||||
@@ -19,13 +18,29 @@ const CACHE_SIZE: usize = 32 * 1024; // 256Mb
|
||||
///
|
||||
pub struct ToastStore {
|
||||
db: Storage, // key-value database
|
||||
pub commit_lsn: Lsn, // LSN of last committed transaction
|
||||
}
|
||||
|
||||
pub struct ToastIterator<'a> {
|
||||
iter: StorageIterator<'a>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct PageData {
|
||||
data: [u8; 8192],
|
||||
}
|
||||
|
||||
impl PageData {
|
||||
pub fn find_first_zero_bit(&self, offs: usize) -> usize {
|
||||
let bytes = self.data;
|
||||
for i in offs..8192 {
|
||||
if bytes[i] != 0xFFu8 {
|
||||
return i * 8 + bytes[i].trailing_ones() as usize;
|
||||
}
|
||||
}
|
||||
usize::MAX
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ToastIterator<'a> {
|
||||
type Item = Result<(Key, Value)>;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@@ -125,14 +140,15 @@ impl ToastStore {
|
||||
StorageConfig {
|
||||
cache_size: CACHE_SIZE,
|
||||
nosync: false,
|
||||
mursiw: true,
|
||||
},
|
||||
)?,
|
||||
commit_lsn: Lsn(0),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn put(&mut self, key: Key, value: Value) -> Result<()> {
|
||||
pub fn put(&self, key: Key, value: Value) -> Result<()> {
|
||||
let mut tx = self.db.start_transaction();
|
||||
self.tx_remove(&mut tx, &key)?;
|
||||
let value_len = value.len();
|
||||
let mut key = key;
|
||||
if value_len >= TOAST_SEGMENT_SIZE {
|
||||
@@ -146,7 +162,7 @@ impl ToastStore {
|
||||
key.extend_from_slice(&n_segments.to_be_bytes());
|
||||
key.extend_from_slice(&[0u8; 2]);
|
||||
let key_len = key.len();
|
||||
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
|
||||
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
|
||||
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
|
||||
tx.put(
|
||||
&key,
|
||||
@@ -155,10 +171,8 @@ impl ToastStore {
|
||||
offs += TOAST_SEGMENT_SIZE;
|
||||
segno += 1;
|
||||
}
|
||||
if offs < compressed_data_len {
|
||||
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
|
||||
tx.put(&key, &compressed_data[offs..].to_vec())?;
|
||||
}
|
||||
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
|
||||
tx.put(&key, &compressed_data[offs..].to_vec())?;
|
||||
} else {
|
||||
key.extend_from_slice(&[0u8; 4]);
|
||||
tx.put(&key, &value)?;
|
||||
@@ -167,10 +181,9 @@ impl ToastStore {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> {
|
||||
let mut tx = self.db.start_transaction();
|
||||
pub fn commit(&mut self) -> Result<()> {
|
||||
let tx = self.db.start_transaction();
|
||||
tx.commit()?;
|
||||
self.commit_lsn = commit_lsn;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -210,10 +223,15 @@ impl ToastStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: Key) -> Result<()> {
|
||||
pub fn remove(&self, key: Key) -> Result<()> {
|
||||
let mut tx = self.db.start_transaction();
|
||||
let mut min_key = key.clone();
|
||||
let mut max_key = key;
|
||||
self.tx_remove(&mut tx, &key)?;
|
||||
tx.delay()
|
||||
}
|
||||
|
||||
pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> {
|
||||
let mut min_key = key.to_vec();
|
||||
let mut max_key = key.to_vec();
|
||||
min_key.extend_from_slice(&[0u8; 4]);
|
||||
max_key.extend_from_slice(&[0xFFu8; 4]);
|
||||
let mut iter = tx.range(&min_key..&max_key);
|
||||
@@ -231,7 +249,10 @@ impl ToastStore {
|
||||
tx.remove(&key)?;
|
||||
}
|
||||
}
|
||||
tx.delay()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.db.get_database_info().db_used
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user