Use delayed commit in buffered_repo

This commit is contained in:
Konstantin Knizhnik
2021-10-27 19:37:23 +03:00
parent 497258c6fe
commit ce779cc754
7 changed files with 134 additions and 62 deletions

4
Cargo.lock generated
View File

@@ -2571,9 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54d2b862dd7bc9ac752c4642a0fb126dc17cb3895e20d25135f3f5052fa3000b"
version = "0.1.6"
dependencies = [
"anyhow",
"crc32c",

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.1.5"
yakv = "0.1.6"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -1417,21 +1417,24 @@ impl<'a> BufferedTimelineWriter<'a> {
size: Some(blknum + 1),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
/*
// Fill gap with zero pages
for blk in rel_size..blknum {
let key = StoreKey::Data(DataKey {
rel,
blknum: blk,
lsn,
});
store
.data
.put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?;
}
*/
/* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when
* page in accessed before been wal logged
// Fill gap with zero pages
for blk in rel_size..blknum {
let key = StoreKey::Data(DataKey {
rel,
blknum: blk,
lsn,
});
store
.data
.put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?;
}
*/
}
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
}
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
Ok(())
}
}
@@ -1461,9 +1464,9 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
size: Some(relsize),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
}
Ok(())
}
@@ -1477,17 +1480,26 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
let mv = MetadataValue { size: None }; // None indicates dropped relation
store.data.put(&mk.ser()?, &mv.ser()?)?;
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
}
Ok(())
}
///
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()> {
let store = self.tl.store.write().unwrap();
store.data.checkpoint()?;
self.tl.disk_consistent_lsn.store(self.tl.get_last_record_lsn());
Ok(())
}
///
/// Remember the (end of) last valid WAL record remembered in the timeline.
///
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.tl.last_record_lsn.advance(new_lsn);
}
}

View File

@@ -33,7 +33,7 @@ pub mod defaults {
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_millis(1000);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);

View File

@@ -179,6 +179,11 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
/// Previous last record LSN is stored alongside the latest and can be read.
fn advance_last_record_lsn(&self, lsn: Lsn);
///
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -126,7 +126,6 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
}
// TODO: Scan pg_tblspc
writer.advance_last_record_lsn(lsn);
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
@@ -140,6 +139,7 @@ pub fn import_timeline_from_postgres_datadir(
lsn,
&mut pg_control.checkPointCopy.clone(),
)?;
writer.checkpoint()?;
Ok(())
}

View File

@@ -5,10 +5,14 @@ use std::ops::RangeBounds;
use std::path::Path;
use yakv::storage::{Key, Storage, StorageIterator, Value};
use zenith_utils::bin_ser::BeSer;
use tracing::*;
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 1024; // 8Mb
const MAIN_CACHE_SIZE: usize = 8 * 1024; // 64Mb
const TOAST_CACHE_SIZE: usize = 1024; // 8Mb
const MAIN_COMMIT_THRESHOLD: usize = MAIN_CACHE_SIZE / 2;
const TOAST_COMMIT_THRESHOLD: usize = TOAST_CACHE_SIZE / 2;
const TOAST_VALUE_TAG: u8 = 0;
const PLAIN_VALUE_TAG: u8 = 1;
@@ -22,10 +26,10 @@ type ToastId = u32;
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
pub update_count: u64,
pub index: Storage, // primary storage
blobs: Storage, // storage for TOAST segments
next_id: ToastId, // counter used to identify new TOAST segments
main: Storage, // primary storage
toast: Storage, // storage for TOAST segments
next_id: ToastId, // counter used to identify new TOAST segments
pub committed: bool, // last transaction was committed (not delayed)
}
///
@@ -75,31 +79,31 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> {
impl ToastStore {
pub fn new(path: &Path) -> Result<ToastStore> {
Ok(ToastStore {
update_count: 0,
index: Storage::open(
&path.join("index.db"),
None,
CACHE_SIZE,
main: Storage::open(
&path.join("main.db"),
Some(&path.join("main.log")),
MAIN_CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
blobs: Storage::open(
&path.join("blobs.db"),
None,
CACHE_SIZE,
toast: Storage::open(
&path.join("toast.db"),
Some(&path.join("toast.log")),
TOAST_CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
next_id: 0,
committed: false,
})
}
pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> {
let mut index_tx = self.index.start_transaction();
let mut main_tx = self.main.start_transaction();
let value_len = value.len();
self.update_count += 1;
let main_pinned;
if value_len >= TOAST_SEGMENT_SIZE {
let mut blobs_tx = self.blobs.start_transaction();
let mut toast_tx = self.toast.start_transaction();
if self.next_id == 0 {
self.next_id = blobs_tx
self.next_id = toast_tx
.iter()
.next_back()
.transpose()?
@@ -114,7 +118,7 @@ impl ToastStore {
let mut offs: usize = 0;
let mut segno = 0u32;
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
blobs_tx.put(
toast_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
)?;
@@ -122,7 +126,7 @@ impl ToastStore {
segno += 1;
}
if offs < compressed_data_len {
blobs_tx.put(
toast_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
&compressed_data[offs..].to_vec(),
)?;
@@ -134,20 +138,43 @@ impl ToastStore {
}
.ser()?;
value.insert(0, TOAST_VALUE_TAG);
index_tx.put(key, &value)?;
blobs_tx.commit()?;
main_tx.put(key, &value)?;
main_pinned = main_tx.get_cache_info().pinned;
// If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references
if main_pinned > MAIN_COMMIT_THRESHOLD
|| toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD
{
toast_tx.commit()?;
} else {
toast_tx.delay()?;
}
} else {
let mut vec = Vec::with_capacity(value.len() + 1);
vec.push(PLAIN_VALUE_TAG);
vec.extend_from_slice(&value);
index_tx.put(key, &vec)?;
main_tx.put(key, &vec)?;
main_pinned = main_tx.get_cache_info().pinned;
}
if main_pinned > MAIN_COMMIT_THRESHOLD {
main_tx.commit()?;
self.committed = true;
} else {
main_tx.delay()?;
self.committed = false;
}
index_tx.commit()?;
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
let mut main_tx = self.main.start_transaction();
let mut toast_tx = self.toast.start_transaction();
toast_tx.commit()?;
main_tx.commit()?;
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<Value>> {
self.index
self.main
.get(&key.to_vec())
.transpose()
.and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?))))
@@ -161,20 +188,22 @@ impl ToastStore {
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
ToastIterator {
store: self,
iter: self.index.range(range),
iter: self.main.range(range),
}
}
pub fn remove(&self, key: &Key) -> Result<()> {
let mut index_tx = self.index.start_transaction();
if let Some(value) = index_tx.get(key)? {
pub fn remove(&mut self, key: &Key) -> Result<()> {
let mut main_tx = self.main.start_transaction();
if let Some(value) = main_tx.get(key)? {
main_tx.remove(key)?;
let main_pinned = main_tx.get_cache_info().pinned;
if value[0] == TOAST_VALUE_TAG {
let mut blobs_tx = self.blobs.start_transaction();
let mut toast_tx = self.toast.start_transaction();
let toast_ref = ToastRef::des(&value[1..])?;
let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1)
/ TOAST_SEGMENT_SIZE) as u32;
for segno in 0..n_segments {
blobs_tx.remove(
toast_tx.remove(
&ToastSegId {
toast_id: toast_ref.toast_id,
segno,
@@ -182,11 +211,31 @@ impl ToastStore {
.ser()?,
)?;
}
blobs_tx.commit()?;
// If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references
if main_pinned > MAIN_COMMIT_THRESHOLD
|| toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD
{
toast_tx.commit()?;
} else {
toast_tx.delay()?;
}
}
index_tx.remove(key)?;
if main_pinned > MAIN_COMMIT_THRESHOLD {
main_tx.commit()?;
self.committed = true;
} else {
main_tx.delay()?;
self.committed = false;
}
} else {
self.committed = false;
}
index_tx.commit()?;
Ok(())
}
pub fn close(&self) -> Result<()> {
self.toast.close()?; // commit and close TOAST store first to avoid dangling references
self.main.close()?;
Ok(())
}
@@ -207,7 +256,7 @@ impl ToastStore {
segno: n_segments,
}
.ser()?;
for seg in self.blobs.range(from..till) {
for seg in self.toast.range(from..till) {
toast.extend_from_slice(&seg?.1);
}
Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?)
@@ -217,3 +266,11 @@ impl ToastStore {
}
}
}
impl Drop for ToastStore {
fn drop(&mut self) {
info!("Storage closed");
// FIXME-KK: better call close() explicitly
self.close().unwrap();
}
}