Use parking_lot synchronization primitives

This commit is contained in:
Konstantin Knizhnik
2021-11-30 18:28:35 +03:00
parent 5ad82418a9
commit e4b89a1849
6 changed files with 263 additions and 203 deletions

16
Cargo.lock generated
View File

@@ -902,9 +902,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [
"scopeguard",
]
@@ -1190,6 +1190,7 @@ dependencies = [
"lazy_static",
"log",
"lz4_flex",
"parking_lot",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1215,9 +1216,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.11.1"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
@@ -1226,9 +1227,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
@@ -2571,10 +2572,11 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.1"
version = "0.2.4"
dependencies = [
"anyhow",
"fs2",
"parking_lot",
]
[[package]]

View File

@@ -37,14 +37,15 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.2.2"
yakv = { path = "../../yakv" }
#yakv = "0.2.4"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }
workspace_hack = { path = "../workspace_hack" }
parking_lot = "0.11.2"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -18,6 +18,7 @@ use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use tracing::*;
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::convert::TryInto;
use std::fs;
@@ -26,7 +27,7 @@ use std::io::Write;
use std::ops::{Bound::*, Deref};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
@@ -181,24 +182,24 @@ struct BrinTag {
//
// Relish store consists of persistent KV store and transient metadata cache loadedon demand
//
struct RelishStore {
data: ToastStore,
struct BufferedTimelineInner {
meta: Option<HashMap<RelishTag, MetadataSnapshot>>,
brin: BTreeMap<BrinTag, Lsn>,
last_checkpoint: Lsn,
last_gc: Lsn,
last_commit: Lsn,
}
/// Public interface
impl Repository for BufferedRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
Ok(self.get_timeline_locked(timelineid, &mut timelines)?)
}
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
@@ -281,7 +282,7 @@ impl Repository for BufferedRepository {
fn shutdown(&self) -> Result<()> {
trace!("BufferedRepository shutdown for tenant {}", self.tenantid);
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
for (timelineid, timeline) in timelines.iter() {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
@@ -298,7 +299,7 @@ impl Repository for BufferedRepository {
/// Private functions
impl BufferedRepository {
fn get_buffered_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<BufferedTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
self.get_timeline_locked(timelineid, &mut timelines)
}
@@ -404,11 +405,10 @@ impl BufferedRepository {
let timelines: Vec<(ZTimelineId, Arc<BufferedTimeline>)> = self
.timelines
.lock()
.unwrap()
.iter()
.map(|pair| (*pair.0, pair.1.clone()))
.collect();
//let timelines = self.timelines.lock().unwrap();
//let timelines = self.timelines.lock();
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid)
@@ -455,7 +455,6 @@ impl BufferedRepository {
let timelines: Vec<(ZTimelineId, Arc<BufferedTimeline>)> = self
.timelines
.lock()
.unwrap()
.iter()
.map(|pair| (*pair.0, pair.1.clone()))
.collect();
@@ -613,7 +612,7 @@ impl BufferedRepository {
// grab mutex to prevent new timelines from being created here.
// TODO: We will hold it for a long time
//let mut timelines = self.timelines.lock().unwrap();
//let mut timelines = self.timelines.lock();
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
@@ -725,7 +724,8 @@ pub struct BufferedTimeline {
tenantid: ZTenantId,
timelineid: ZTimelineId,
store: RwLock<RelishStore>, // provide MURSIW access to the storage
inner: RwLock<BufferedTimelineInner>,
store: ToastStore,
// WAL redo manager
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
@@ -833,8 +833,8 @@ impl Timeline for BufferedTimeline {
let till = StoreKey::Data(DataKey { rel, blknum, lsn }).ser()?.to_vec();
//let mut reconstruct_key: Option<DataKey> = None;
let result = {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from..=&till);
let snapshot = self.store.take_snapshot();
let mut iter = snapshot.range(&from..=&till);
// locate latest version with LSN <= than requested
if let Some(entry) = iter.next_back() {
@@ -875,9 +875,15 @@ impl Timeline for BufferedTimeline {
bail!("Unexpected key type {:?}", key);
}
} else {
bail!("Base image not found for relish {} at {}", rel, lsn);
bail!(
"Base image not found for relish {} at {}",
dk.rel,
dk.lsn
);
}
}
drop(iter);
drop(snapshot);
RECONSTRUCT_TIME.observe_closure_duration(|| {
self.reconstruct_page(rel, blknum, lsn, data)
})
@@ -891,13 +897,10 @@ impl Timeline for BufferedTimeline {
Ok(ZERO_PAGE.clone())
}
};
/*
/* // TODO: insertion materialized page in storage cause negative impact on performance.
if let Some(key) = reconstruct_key {
if let Ok(img) = &result {
let mut store = self.store.write().unwrap();
store
.data
.put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?;
self.store.put(&StoreKey::Data(key).ser()?, &PageVersion::Page(img.clone()).ser()?)?;
}
}
*/
@@ -913,14 +916,16 @@ impl Timeline for BufferedTimeline {
}
debug_assert!(lsn <= self.get_last_record_lsn());
let store = self.store.read().unwrap();
// Use metadata hash only if it was loaded
if let Some(hash) = &store.meta {
if let Some(snap) = hash.get(&rel) {
// We can used cached version only of requested LSN is >= than LSN of last version.
// Otherwise extract historical value from KV storage.
if snap.lsn <= lsn {
return Ok(Some(snap.size));
{
let inner = self.inner.read();
// Use metadata hash only if it was loaded
if let Some(hash) = &inner.meta {
if let Some(snap) = hash.get(&rel) {
// We can used cached version only of requested LSN is >= than LSN of last version.
// Otherwise extract historical value from KV storage.
if snap.lsn <= lsn {
return Ok(Some(snap.size));
}
}
}
}
@@ -929,7 +934,8 @@ impl Timeline for BufferedTimeline {
.to_vec();
let till = StoreKey::Metadata(MetadataKey { rel, lsn }).ser()?.to_vec();
// locate last version with LSN <= than requested
let mut iter = store.data.range(&from..=&till);
let snapshot = self.store.take_snapshot();
let mut iter = snapshot.range(&from..=&till);
if let Some(pair) = iter.next_back() {
let meta = MetadataValue::des(&pair?.1)?;
@@ -999,9 +1005,9 @@ impl Timeline for BufferedTimeline {
});
let mut relsizes: HashMap<RelishTag, VecMap<Lsn, u32>> = HashMap::new();
let mut dropped: HashSet<RelishTag> = HashSet::new();
let store = self.store.read().unwrap();
'meta: loop {
let iter = store.data.range(&from.ser()?..);
let snapshot = self.store.take_snapshot();
let iter = snapshot.range(&from.ser()?..);
for entry in iter {
let pair = entry?;
@@ -1055,7 +1061,8 @@ impl Timeline for BufferedTimeline {
let mut last_lsn = Lsn(0);
let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
'pages: loop {
let iter = store.data.range(&from.ser()?..);
let snapshot = self.store.take_snapshot();
let iter = snapshot.range(&from.ser()?..);
for entry in iter {
let pair = entry?;
if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? {
@@ -1198,7 +1205,7 @@ impl Timeline for BufferedTimeline {
}
fn get_current_logical_size(&self) -> usize {
self.current_logical_size.load(Ordering::Acquire) as usize
self.store.size() as usize
}
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
@@ -1231,13 +1238,16 @@ impl Timeline for BufferedTimeline {
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(BufferedTimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock(),
})
}
}
impl RelishStore {
fn load_metadata(&mut self) -> Result<&'_ mut HashMap<RelishTag, MetadataSnapshot>> {
impl BufferedTimelineInner {
fn get_metadata(
&mut self,
store: &ToastStore,
) -> Result<&'_ mut HashMap<RelishTag, MetadataSnapshot>> {
if self.meta.is_none() {
let mut meta: HashMap<RelishTag, MetadataSnapshot> = HashMap::new();
let mut till = StoreKey::Metadata(MetadataKey {
@@ -1245,9 +1255,12 @@ impl RelishStore {
lsn: Lsn::MAX,
});
loop {
let mut iter = self.data.range(..&till.ser()?);
let snapshot = store.take_snapshot();
let mut iter = snapshot.range(..&till.ser()?);
if let Some(entry) = iter.next_back() {
let pair = entry?;
drop(iter);
drop(snapshot);
let key = StoreKey::des(&pair.0)?;
if let StoreKey::Metadata(last) = key {
let metadata = MetadataValue::des(&pair.1)?;
@@ -1277,7 +1290,7 @@ impl RelishStore {
Ok(self.meta.as_mut().unwrap())
}
fn _unload_metadata(&mut self) {
fn _unget_metadata(&mut self) {
self.meta = None;
}
}
@@ -1302,12 +1315,13 @@ impl BufferedTimeline {
conf,
timelineid,
tenantid,
store: RwLock::new(RelishStore {
data: ToastStore::new(&path)?,
store: ToastStore::new(&path)?,
inner: RwLock::new(BufferedTimelineInner {
meta: None,
brin: BTreeMap::new(),
last_checkpoint: Lsn(0),
last_gc: Lsn(0),
last_commit: Lsn(0),
}),
walredo_mgr,
@@ -1369,14 +1383,16 @@ impl BufferedTimeline {
})
.ser()?; // Lsn::MAX tranforms inclusive boundary to exclusive
let store = self.store.read().unwrap();
// Iterate through relish in reverse order (to locae last version)
loop {
// Use exclusive boundary for till to be able to skip to previous relish
let mut iter = store.data.range(&from..&till);
let snapshot = self.store.take_snapshot();
let mut iter = snapshot.range(&from..&till);
if let Some(entry) = iter.next_back() {
// locate last version
let pair = entry?;
drop(iter);
drop(snapshot);
let key = StoreKey::des(&pair.0)?;
if let StoreKey::Metadata(mk) = key {
if mk.lsn <= lsn {
@@ -1395,10 +1411,13 @@ impl BufferedTimeline {
.ser()?;
let till = StoreKey::Metadata(MetadataKey { rel: mk.rel, lsn }).ser()?;
let mut iter = store.data.range(&from..=&till);
let snapshot = self.store.take_snapshot();
let mut iter = snapshot.range(&from..=&till);
if let Some(entry) = iter.next_back() {
// locate visible version
let pair = entry?;
drop(iter);
drop(snapshot);
let key = StoreKey::des(&pair.0)?;
if let StoreKey::Metadata(mk) = key {
let meta = MetadataValue::des(&pair.1)?;
@@ -1443,19 +1462,24 @@ impl BufferedTimeline {
}
fn make_snapshot(&self) -> Result<()> {
let store = self.store.read().unwrap();
let inner = self.inner.read();
let now = Instant::now();
if let Some(meta_hash) = &store.meta {
if let Some(meta_hash) = &inner.meta {
// Create copy of relation map to avoid holding lock in inner for a long time
let snapshot: Vec<(RelishTag, u32)> = meta_hash
.iter()
.map(|(rel, meta)| (*rel, meta.size))
.collect();
drop(inner);
let lsn = self.get_last_record_lsn();
for (rel, snap) in meta_hash.iter() {
let rel_size = snap.size;
for segno in 0..(rel_size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE {
for (rel, size) in snapshot {
for segno in 0..(size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE {
let first_blknum = segno * RELISH_SEG_SIZE;
let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, rel_size);
let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, size);
let images: Result<Vec<Bytes>> = (first_blknum..last_blknum)
.map(|blknum| self.get_page_at_lsn(*rel, blknum, lsn))
.map(|blknum| self.get_page_at_lsn(rel, blknum, lsn))
.collect();
let segtag = SegmentTag::from_blknum(*rel, first_blknum);
let segtag = SegmentTag::from_blknum(rel, first_blknum);
ImageLayer::create(
self.conf,
self.timelineid,
@@ -1505,14 +1529,13 @@ impl BufferedTimeline {
let mut n_checkpointed_records = 0;
let last_checkpoint;
{
let mut store = self.store.write().unwrap();
last_checkpoint = store.last_checkpoint;
store.last_checkpoint = self.get_last_record_lsn();
let mut inner = self.inner.write();
last_checkpoint = inner.last_checkpoint;
inner.last_checkpoint = self.get_last_record_lsn();
}
'outer: loop {
let store = self.store.read().unwrap();
let mut iter = store.data.range(&from..&till);
let snapshot = self.store.take_snapshot();
let mut iter = snapshot.range(&from..&till);
if let Some(entry) = iter.next_back() {
let pair = entry?;
let key = pair.0;
@@ -1523,26 +1546,29 @@ impl BufferedTimeline {
seg: dk.blknum / BRIN_SEGMENT_SIZE,
};
// At first iteration we need to scan the whole storage, because BRIN does't have enough information
if last_checkpoint != Lsn(0)
&& store
.brin
.get(&seg_tag)
.map_or(true, |lsn| *lsn <= last_checkpoint)
{
// This segment was not update since last checkpoint: jump to next one
let mut iter = store.brin.range(..seg_tag);
while let Some((next_seg, lsn)) = iter.next_back() {
if *lsn > last_checkpoint {
till = StoreKey::Data(DataKey {
rel: next_seg.rel,
blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE,
lsn: Lsn(0),
})
.ser()?;
continue 'outer;
let inner = self.inner.read();
if last_checkpoint != Lsn(0)
&& inner
.brin
.get(&seg_tag)
.map_or(true, |lsn| *lsn <= last_checkpoint)
{
// This segment was not update since last checkpoint: jump to next one
let mut iter = inner.brin.range(..seg_tag);
while let Some((next_seg, lsn)) = iter.next_back() {
if *lsn > last_checkpoint {
till = StoreKey::Data(DataKey {
rel: next_seg.rel,
blknum: (next_seg.seg + 1) * BRIN_SEGMENT_SIZE,
lsn: Lsn(0),
})
.ser()?;
continue 'outer;
}
}
break;
}
break;
}
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Wal(rec) = ver {
@@ -1582,17 +1608,15 @@ impl BufferedTimeline {
bail!("Base image not found for relish {} at {}", dk.rel, dk.lsn);
}
}
// release locks and reconstruct page withut blocking storage
drop(iter);
drop(store);
// See comment above. May be we should also enforce here checkpointing of too old versions.
drop(iter);
drop(snapshot);
if history_len as u64 >= reconstruct_threshold {
let img = RECONSTRUCT_TIME.observe_closure_duration(|| {
self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data)
});
let mut store = self.store.write().unwrap();
store.data.put(key, PageVersion::Page(img?).ser()?)?;
self.store.put(key, PageVersion::Page(img?).ser()?)?;
n_checkpointed_records += 1;
}
}
@@ -1682,14 +1706,14 @@ impl BufferedTimeline {
let last_gc;
{
let mut store = self.store.write().unwrap();
last_gc = store.last_gc;
store.last_gc = self.get_last_record_lsn();
let mut inner = self.inner.write();
last_gc = inner.last_gc;
inner.last_gc = self.get_last_record_lsn();
}
'meta: loop {
let store = self.store.read().unwrap();
let iter = store.data.range(&from.ser()?..);
let snapshot = self.store.take_snapshot();
let iter = snapshot.range(&from.ser()?..);
// We can not remove deteriorated version immediately, we need to check first that successor exists
let mut last_key: Option<yakv::storage::Key> = None;
@@ -1716,9 +1740,8 @@ impl BufferedTimeline {
// If we are still on the same relish...
if same_rel {
// then drop previus version as it is not needed any more
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(prev_key)?;
drop(snapshot);
self.store.remove(prev_key)?;
result.meta_removed += 1;
// We should reset iterator and start from the current point
continue 'meta;
@@ -1727,9 +1750,8 @@ impl BufferedTimeline {
let meta = MetadataValue::des(&pair.1)?;
if meta.size.is_none() {
// object was dropped, so we can immediately remove deteriorated version
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(raw_key)?;
drop(snapshot);
self.store.remove(raw_key)?;
dropped.insert(dk.rel);
result.meta_dropped += 1;
// We should reset iterator and start from the current point
@@ -1764,8 +1786,8 @@ impl BufferedTimeline {
// currently proceed block number
let mut from_blknum = 0;
'pages: loop {
let store = self.store.read().unwrap();
let iter = store.data.range(&from.ser()?..);
let snapshot = self.store.take_snapshot();
let iter = snapshot.range(&from.ser()?..);
// Array to accumulate keys we can remove.
let mut deteriorated: Vec<yakv::storage::Key> = Vec::new();
for entry in iter {
@@ -1779,22 +1801,25 @@ impl BufferedTimeline {
};
// At first iteration we need to scan the whole storage,
// because BRIN does't have enough information
if last_gc != Lsn(0)
&& store.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc)
{
// This segment was not update since last GC: jump to next one
let mut iter = store.brin.range((Excluded(seg_tag), Unbounded));
while let Some((next_seg, lsn)) = iter.next_back() {
if *lsn > last_gc {
from = StoreKey::Data(DataKey {
rel: next_seg.rel,
blknum: next_seg.seg * BRIN_SEGMENT_SIZE,
lsn: Lsn(0),
});
continue 'pages;
let inner = self.inner.read();
if last_gc != Lsn(0)
&& inner.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc)
{
// This segment was not update since last GC: jump to next one
let mut iter = inner.brin.range((Excluded(seg_tag), Unbounded));
while let Some((next_seg, lsn)) = iter.next() {
if *lsn > last_gc {
from = StoreKey::Data(DataKey {
rel: next_seg.rel,
blknum: next_seg.seg * BRIN_SEGMENT_SIZE,
lsn: Lsn(0),
});
continue 'pages;
}
}
break;
}
break;
}
let same_page = from_rel == dk.rel && from_blknum == dk.blknum;
if !same_page {
@@ -1817,11 +1842,10 @@ impl BufferedTimeline {
let ver = PageVersion::des(&pair.1)?;
if let PageVersion::Page(_) = ver {
// ... then remove all previously accumulated deltas and images, as them are not needed any more
drop(store);
let mut store = self.store.write().unwrap();
result.pages_removed += deteriorated.len() as u64;
drop(snapshot);
for key in deteriorated {
store.data.remove(key)?;
self.store.remove(key)?;
}
// We should reset iterator and start from the current point
continue 'pages;
@@ -1835,10 +1859,9 @@ impl BufferedTimeline {
// This relations was dropped beyond PITR interval:
// we can remove all its pages
assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation
drop(store);
let mut store = self.store.write().unwrap();
// We should reset iterator and start from the current point
store.data.remove(raw_key)?;
// We should reset iterator and start from the current point
drop(snapshot);
self.store.remove(raw_key)?;
result.pages_dropped += 1;
continue 'pages;
}
@@ -1956,9 +1979,9 @@ impl<'a> BufferedTimelineWriter<'a> {
}
ensure!(lsn.is_aligned(), "unaligned record LSN");
let key = StoreKey::Data(DataKey { rel, blknum, lsn });
let mut store = self.tl.store.write().unwrap();
store.data.put(key.ser()?, ver.ser()?)?;
store.brin.insert(
self.tl.store.put(key.ser()?, ver.ser()?)?;
let mut inner = self.tl.inner.write();
inner.brin.insert(
BrinTag {
rel,
seg: blknum / BRIN_SEGMENT_SIZE,
@@ -1967,7 +1990,7 @@ impl<'a> BufferedTimelineWriter<'a> {
);
// Update metadata
let meta_hash = store.load_metadata()?;
let meta_hash = inner.get_metadata(&self.tl.store)?;
let rel_size = meta_hash.get(&rel).map(|m| m.size).unwrap_or(0);
if rel_size <= blknum {
meta_hash.insert(
@@ -1977,11 +2000,13 @@ impl<'a> BufferedTimelineWriter<'a> {
lsn,
},
);
drop(inner);
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue {
size: Some(blknum + 1),
};
store.data.put(mk.ser()?, mv.ser()?)?;
self.tl.store.put(mk.ser()?, mv.ser()?)?;
inner = self.tl.inner.write();
/* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when
* page in accessed before been wal logged
// Fill gap with zero pages
@@ -1997,8 +2022,10 @@ impl<'a> BufferedTimelineWriter<'a> {
}
*/
}
if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn {
store.data.commit(lsn)?;
if inner.last_commit + self.tl.conf.checkpoint_distance < lsn {
inner.last_commit = lsn;
drop(inner);
self.tl.store.commit()?;
self.tl.disk_consistent_lsn.store(lsn);
}
Ok(())
@@ -2073,26 +2100,30 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
let mut store = self.tl.store.write().unwrap();
let meta_hash = store.load_metadata()?;
meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn });
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue {
size: Some(relsize),
};
store.data.put(mk.ser()?, mv.ser()?)?;
self.tl.store.put(mk.ser()?, mv.ser()?)?;
let mut inner = self.tl.inner.write();
let meta_hash = inner.get_metadata(&self.tl.store)?;
meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn });
Ok(())
}
fn drop_relish(&self, rel: RelishTag, lsn: Lsn) -> Result<()> {
trace!("drop_segment: {} at {}", rel, lsn);
let mut store = self.tl.store.write().unwrap();
let meta_hash = store.load_metadata()?;
meta_hash.remove(&rel);
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue { size: None }; // None indicates dropped relation
store.data.put(mk.ser()?, mv.ser()?)?;
self.tl.store.put(mk.ser()?, mv.ser()?)?;
let mut inner = self.tl.inner.write();
let meta_hash = inner.get_metadata(&self.tl.store)?;
meta_hash.remove(&rel);
Ok(())
}
@@ -2100,9 +2131,9 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()> {
let mut store = self.tl.store.write().unwrap();
let lsn = self.tl.get_last_record_lsn();
store.data.commit(lsn)?;
self.tl.store.commit()?;
self.tl.inner.write().last_commit = lsn;
self.tl.disk_consistent_lsn.store(lsn);
Ok(())
}

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250);
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(3600);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;

View File

@@ -425,11 +425,20 @@ pub fn save_decoded_record(
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
});
//
// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
@@ -437,9 +446,6 @@ pub fn save_decoded_record(
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
image.extend_from_slice(&recdata[img_offs..img_offs + img_len]);
// Compression of WAL is not yet supported
assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0);
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);

View File

@@ -3,9 +3,10 @@ use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use zenith_utils::lsn::Lsn;
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
use yakv::storage::{
Key, ReadOnlyTransaction, Storage, StorageConfig, StorageIterator, Transaction, Value,
};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
@@ -18,21 +19,62 @@ const CACHE_SIZE: usize = 32 * 1024; // 256Mb
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
db: Storage, // key-value database
pub commit_lsn: Lsn, // LSN of last committed transaction
db: Storage, // key-value database
}
pub struct ToastIterator<'a> {
iter: StorageIterator<'a>,
}
pub struct ToastSnapshot<'a> {
tx: ReadOnlyTransaction<'a>,
}
impl<'a> ToastSnapshot<'a> {
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
ToastIterator {
iter: self.tx.range((from, till)),
}
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
for elem in &mut self.iter {
if let Ok((key, value)) = elem {
let res = if let Ok((key, value)) = elem {
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
@@ -46,20 +88,22 @@ impl<'a> Iterator for ToastIterator<'a> {
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno == n_segments {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
if next_segno != n_segments {
continue;
}
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
}
} else {
return Some(Ok((key, value)));
Ok((key, value))
}
} else {
return Some(elem);
}
elem
};
return Some(res);
}
assert_eq!(next_segno, 0);
None
@@ -125,14 +169,15 @@ impl ToastStore {
StorageConfig {
cache_size: CACHE_SIZE,
nosync: false,
mursiw: true,
},
)?,
commit_lsn: Lsn(0),
})
}
pub fn put(&mut self, key: Key, value: Value) -> Result<()> {
pub fn put(&self, key: Key, value: Value) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
let value_len = value.len();
let mut key = key;
if value_len >= TOAST_SEGMENT_SIZE {
@@ -146,7 +191,7 @@ impl ToastStore {
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8; 2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
@@ -155,10 +200,8 @@ impl ToastStore {
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
if offs < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
}
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
} else {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, &value)?;
@@ -167,53 +210,27 @@ impl ToastStore {
Ok(())
}
pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> {
let mut tx = self.db.start_transaction();
pub fn commit(&self) -> Result<()> {
let tx = self.db.start_transaction();
tx.commit()?;
self.commit_lsn = commit_lsn;
Ok(())
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
ToastIterator {
iter: self.db.range((from, till)),
pub fn take_snapshot(&self) -> ToastSnapshot<'_> {
ToastSnapshot {
tx: self.db.read_only_transaction(),
}
}
pub fn remove(&mut self, key: Key) -> Result<()> {
pub fn remove(&self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
tx.delay()
}
pub fn tx_remove(&self, tx: &mut Transaction, key: &Key) -> Result<()> {
let mut min_key = key.clone();
let mut max_key = key;
let mut max_key = key.clone();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
@@ -231,7 +248,10 @@ impl ToastStore {
tx.remove(&key)?;
}
}
tx.delay()?;
Ok(())
}
pub fn size(&self) -> u64 {
self.db.get_database_info().db_used
}
}