Separate checkpointing and page reconstruction

This commit is contained in:
Konstantin Knizhnik
2021-12-06 12:24:13 +03:00
parent 2460f44e30
commit cb6e231ea9
4 changed files with 71 additions and 76 deletions

4
Cargo.lock generated
View File

@@ -2572,9 +2572,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.6"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3aba428668b9459f60e48264aa9742f1648d24f6a7dd83c8cc73278797131d8f"
checksum = "17eba1abb31dda774cd901a9692a47aac716050975a30993c79826a08de47a34"
dependencies = [
"anyhow",
"fs2",

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.2.6"
yakv = "0.2.7"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -187,7 +187,6 @@ struct BufferedTimelineInner {
brin: BTreeMap<BrinTag, Lsn>,
last_checkpoint: Lsn,
last_gc: Lsn,
last_commit: Lsn,
}
/// Public interface
@@ -416,9 +415,7 @@ impl BufferedRepository {
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(|| {
timeline.checkpoint_internal(conf.reconstruct_threshold, false)
})?
.observe_closure_duration(|| timeline.reconstruct_pages())?
}
// release lock on 'timelines'
}
@@ -897,12 +894,13 @@ impl Timeline for BufferedTimeline {
Ok(ZERO_PAGE.clone())
}
};
/* // TODO: insertion materialized page in storage cause negative impact on performance.
if let Some(key) = reconstruct_key {
if let Ok(img) = &result {
self.store.put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?;
}
/*
// TODO: insertion materialized page in storage cause negative impact on performance.
if let Some(key) = reconstruct_key {
if let Ok(img) = &result {
self.store.put(StoreKey::Data(key).ser()?, PageVersion::Page(img.clone()).ser()?)?;
}
}
*/
result
}
@@ -985,7 +983,7 @@ impl Timeline for BufferedTimeline {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
//pass resonstruct_threshold=0 to force page materialization
.observe_closure_duration(|| self.checkpoint_internal(0, true))
.observe_closure_duration(|| self.checkpoint_internal())
}
///
@@ -1321,7 +1319,6 @@ impl BufferedTimeline {
brin: BTreeMap::new(),
last_checkpoint: Lsn(0),
last_gc: Lsn(0),
last_commit: Lsn(0),
}),
walredo_mgr,
@@ -1497,7 +1494,7 @@ impl BufferedTimeline {
}
///
/// Matrialize last page versions
/// Materialize last page versions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
/// reconstruct_threshold is used to measure total length of applied WAL records.
@@ -1505,7 +1502,7 @@ impl BufferedTimeline {
/// and can be fast replayed. Alternatively we can measure interval from last version LSN:
/// it will enforce materialization of "stabilized" pages. But there is a risk that permanently updated page will never be materialized.
///
fn checkpoint_internal(&self, reconstruct_threshold: u64, _forced: bool) -> Result<()> {
fn reconstruct_pages(&self) -> Result<()> {
// From boundary is constant and till boundary is changed at each iteration.
let from = StoreKey::Data(DataKey {
rel: RelishTag::Relation(ZERO_TAG),
@@ -1526,7 +1523,7 @@ impl BufferedTimeline {
})
.ser()?; // this MAX values allows to use this boundary as exclusive
let mut n_checkpointed_records = 0;
let mut n_reconstructed_pages = 0;
let last_checkpoint;
{
let mut inner = self.inner.write();
@@ -1611,13 +1608,13 @@ impl BufferedTimeline {
// See comment above. May be we should also enforce here checkpointing of too old versions.
drop(iter);
drop(snapshot);
if history_len as u64 >= reconstruct_threshold {
if history_len as u64 >= self.conf.reconstruct_threshold {
let img = RECONSTRUCT_TIME.observe_closure_duration(|| {
self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data)
});
self.store.put(key, PageVersion::Page(img?).ser()?)?;
n_checkpointed_records += 1;
n_reconstructed_pages += 1;
}
}
// Jump to next page. Setting lsn=0 and using it as exclusive boundary allows us to jump to previous page.
@@ -1634,34 +1631,41 @@ impl BufferedTimeline {
break;
}
}
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let ondisk_prev_record_lsn = self.get_prev_record_lsn();
let metadata = TimelineMetadata {
disk_consistent_lsn: self.disk_consistent_lsn.load(),
prev_record_lsn: Some(ondisk_prev_record_lsn),
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
info!("Reconstruct {} pages", n_reconstructed_pages);
self.checkpoint_internal()
}
BufferedRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
&metadata,
false,
)?;
info!("Checkpoint {} records", n_checkpointed_records);
fn checkpoint_internal(&self) -> Result<()> {
let lsn = self.get_last_record_lsn();
self.store.commit()?;
if self.disk_consistent_lsn.fetch_max(lsn) == lsn {
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let ondisk_prev_record_lsn = self.get_prev_record_lsn();
let metadata = TimelineMetadata {
disk_consistent_lsn: self.disk_consistent_lsn.load(),
prev_record_lsn: Some(ondisk_prev_record_lsn),
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(
// self.tenantid,
// self.timelineid,
// layer_uploads,
// disk_consistent_lsn,
// });
BufferedRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
&metadata,
false,
)?;
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(
// self.tenantid,
// self.timelineid,
// layer_uploads,
// disk_consistent_lsn,
// });
}
}
Ok(())
}
@@ -1881,7 +1885,7 @@ impl BufferedTimeline {
}
break;
}
self.checkpoint_internal()?;
result.elapsed = now.elapsed();
Ok(result)
}
@@ -2006,27 +2010,11 @@ impl<'a> BufferedTimelineWriter<'a> {
size: Some(blknum + 1),
};
self.tl.store.put(mk.ser()?, mv.ser()?)?;
inner = self.tl.inner.write();
/* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when
* page in accessed before been wal logged
// Fill gap with zero pages
for blk in rel_size..blknum {
let key = StoreKey::Data(DataKey {
rel,
blknum: blk,
lsn,
});
store
.data
.put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?;
}
*/
}
if inner.last_commit + self.tl.conf.checkpoint_distance < lsn {
inner.last_commit = lsn;
} else {
drop(inner);
self.tl.store.commit()?;
self.tl.disk_consistent_lsn.store(lsn);
}
if self.tl.disk_consistent_lsn.load() + self.tl.conf.checkpoint_distance < lsn {
self.tl.checkpoint_internal()?
}
Ok(())
}
@@ -2131,11 +2119,7 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()> {
let lsn = self.tl.get_last_record_lsn();
self.tl.store.commit()?;
self.tl.inner.write().last_commit = lsn;
self.tl.disk_consistent_lsn.store(lsn);
Ok(())
self.tl.checkpoint()
}
///

View File

@@ -5,12 +5,19 @@ use std::ops::{Bound, RangeBounds};
use std::path::Path;
use yakv::storage::{
Key, ReadOnlyTransaction, Select, Storage, StorageConfig, StorageIterator, Transaction, Value,
Key,
Select,
// ReadOnlyTransaction,
Snapshot,
Storage,
StorageConfig,
StorageIterator,
Transaction,
Value,
};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
//const CACHE_SIZE: usize = 128 * 1024; // 1Gb
///
/// Toast storage consistof two KV databases: one for storing main index
@@ -27,7 +34,8 @@ pub struct ToastIterator<'a> {
}
pub struct ToastSnapshot<'a> {
tx: ReadOnlyTransaction<'a>,
// tx: ReadOnlyTransaction<'a>,
tx: Snapshot<'a>,
}
impl<'a> ToastSnapshot<'a> {
@@ -205,7 +213,8 @@ impl ToastStore {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, &value)?;
}
tx.delay();
tx.subcommit()?;
//tx.delay();
Ok(())
}
@@ -217,14 +226,16 @@ impl ToastStore {
pub fn take_snapshot(&self) -> ToastSnapshot<'_> {
ToastSnapshot {
tx: self.db.read_only_transaction(),
//tx: self.db.read_only_transaction(),
tx: self.db.take_snapshot(),
}
}
pub fn remove(&self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
tx.delay();
tx.subcommit()?;
//tx.delay();
Ok(())
}