Do not produce error in get_page_at_lsn on missed page

This commit is contained in:
Konstantin Knizhnik
2021-10-26 20:07:11 +03:00
parent 0b6008012d
commit 497258c6fe
3 changed files with 289 additions and 87 deletions

View File

@@ -37,7 +37,8 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
yakv = "0.1.3"
#yakv = { path = "../../yakv" }
yakv = "0.1.5"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -42,9 +42,7 @@ use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use zenith_metrics::{
register_histogram, register_int_gauge_vec, Histogram, IntGauge, IntGaugeVec,
};
use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::crashsafe_dir;
@@ -376,7 +374,7 @@ impl BufferedRepository {
fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("checkpointer thread for tenant {} waking up", self.tenantid);
trace!("checkpointer thread for tenant {} waking up", self.tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
@@ -697,15 +695,6 @@ pub struct BufferedTimeline {
// With compressed KV storage them are completely different.
current_logical_size: AtomicUsize, // bytes
// To avoid calling .with_label_values and formatting the tenant and timeline IDs to strings
// every time the logical size is updated, keep a direct reference to the Gauge here.
// unfortunately it doesnt forward atomic methods like .fetch_add
// so use two fields: actual size and metric
// see https://github.com/zenithdb/zenith/issues/622 for discussion
// TODO: it is possible to combine these two fields into single one using custom metric which uses SeqCst
// ordering for its operations, but involves private modules, and macro trickery
current_logical_size_gauge: IntGauge,
/// If `true`, will backup its timeline files to remote storage after freezing.
upload_relishes: bool,
@@ -753,6 +742,8 @@ impl Timeline for BufferedTimeline {
rel
);
}
// FIXME-KK; navigation to ancestor timelines is not yet supported
assert!(self.ancestor_timeline.is_none());
debug_assert!(lsn <= self.get_last_record_lsn());
let from = StoreKey::Data(DataKey {
@@ -809,7 +800,8 @@ impl Timeline for BufferedTimeline {
}
}
} else {
bail!("relish {} not found at {}", rel, lsn);
warn!("block {} of relish {} not found at {}", blknum, rel, lsn);
Ok(ZERO_PAGE.clone())
}
}
@@ -951,7 +943,7 @@ impl Timeline for BufferedTimeline {
}
impl RelishStore {
fn load_metadata(&mut self) -> Result<()> {
fn load_metadata(&mut self) -> Result<&'_ mut HashMap<RelishTag, MetadataSnapshot>> {
if self.meta.is_none() {
let mut meta: HashMap<RelishTag, MetadataSnapshot> = HashMap::new();
let mut till = StoreKey::Metadata(MetadataKey {
@@ -964,7 +956,7 @@ impl RelishStore {
let pair = entry?;
let key = StoreKey::des(&pair.0)?;
if let StoreKey::Metadata(last) = key {
let metadata = MetadataValue::des(&pair.0)?;
let metadata = MetadataValue::des(&pair.1)?;
if let Some(size) = metadata.size {
// igonore dropped relations
meta.insert(
@@ -988,7 +980,7 @@ impl RelishStore {
}
self.meta = Some(meta)
}
Ok(())
Ok(self.meta.as_mut().unwrap())
}
fn _unload_metadata(&mut self) {
@@ -1011,9 +1003,6 @@ impl BufferedTimeline {
current_logical_size: usize,
upload_relishes: bool,
) -> Result<BufferedTimeline> {
let current_logical_size_gauge = LOGICAL_TIMELINE_SIZE
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.unwrap();
let path = conf.timeline_path(&timelineid, &tenantid);
let timeline = BufferedTimeline {
conf,
@@ -1036,7 +1025,6 @@ impl BufferedTimeline {
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn,
current_logical_size: AtomicUsize::new(current_logical_size),
current_logical_size_gauge,
upload_relishes,
write_lock: Mutex::new(()),
@@ -1063,6 +1051,7 @@ impl BufferedTimeline {
//
// List all relish in inclsive range [from_rel, till_rel] exists at the specfied LSN
//
fn list_relishes(
&self,
from_rel: RelishTag,
@@ -1175,6 +1164,7 @@ impl BufferedTimeline {
})
.ser()?; // this MAX values allows to use this boundary as exclusive
let mut n_checkpointed_records = 0;
loop {
let store = self.store.read().unwrap();
@@ -1231,7 +1221,8 @@ impl BufferedTimeline {
});
let mut store = self.store.write().unwrap();
store.data.put(&key, &img?.to_vec())?;
store.data.put(&key, &PageVersion::Image(img?).ser()?)?;
n_checkpointed_records += 1;
}
}
// Jump to next page. Setting lsn=0 and using it as exclusive boundary allows us to jump to previous page.
@@ -1248,6 +1239,24 @@ impl BufferedTimeline {
break;
}
}
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let ondisk_prev_record_lsn = self.get_prev_record_lsn();
let metadata = TimelineMetadata {
disk_consistent_lsn: self.disk_consistent_lsn.load(),
prev_record_lsn: Some(ondisk_prev_record_lsn),
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
BufferedRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
&metadata,
false,
)?;
trace!("Checkpoint {} records", n_checkpointed_records);
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(
@@ -1372,8 +1381,14 @@ pub enum PageVersion {
Delta(WALRecord),
}
impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
impl<'a> BufferedTimelineWriter<'a> {
fn put_page_version(
&self,
rel: RelishTag,
blknum: u32,
lsn: Lsn,
ver: PageVersion,
) -> Result<()> {
if !rel.is_blocky() && blknum != 0 {
bail!(
"invalid request for block {} for non-blocky relish {}",
@@ -1382,24 +1397,15 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
);
}
ensure!(lsn.is_aligned(), "unaligned record LSN");
let key = StoreKey::Data(DataKey { rel, blknum, lsn });
let value = PageVersion::Delta(rec);
let mut store = self.tl.store.write().unwrap();
store.data.put(&key.ser()?, &value.ser()?)?;
store.data.put(&key.ser()?, &ver.ser()?)?;
// Update metadata
store.load_metadata()?;
if store
.meta
.as_ref()
.unwrap()
.get(&rel)
.map(|m| m.size)
.unwrap_or(0)
<= blknum
{
store.meta.as_mut().unwrap().insert(
let meta_hash = store.load_metadata()?;
let rel_size = meta_hash.get(&rel).map(|m| m.size).unwrap_or(0);
if rel_size <= blknum {
meta_hash.insert(
rel,
MetadataSnapshot {
size: blknum + 1,
@@ -1411,52 +1417,32 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
size: Some(blknum + 1),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
/*
// Fill gap with zero pages
for blk in rel_size..blknum {
let key = StoreKey::Data(DataKey {
rel,
blknum: blk,
lsn,
});
store
.data
.put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?;
}
*/
}
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
Ok(())
}
}
impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
self.put_page_version(rel, blknum, lsn, PageVersion::Delta(rec))
}
fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
if !rel.is_blocky() && blknum != 0 {
bail!(
"invalid request for block {} for non-blocky relish {}",
blknum,
rel
);
}
ensure!(lsn.is_aligned(), "unaligned record LSN");
let key = StoreKey::Data(DataKey { rel, blknum, lsn });
let value = PageVersion::Image(img);
let mut store = self.tl.store.write().unwrap();
store.data.put(&key.ser()?, &value.ser()?)?;
// Update netadata
store.load_metadata()?;
if store
.meta
.as_ref()
.unwrap()
.get(&rel)
.map(|m| m.size)
.unwrap_or(0)
<= blknum
{
store.meta.as_mut().unwrap().insert(
rel,
MetadataSnapshot {
size: blknum + 1,
lsn,
},
);
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue {
size: Some(blknum + 1),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
}
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
Ok(())
self.put_page_version(rel, blknum, lsn, PageVersion::Image(img))
}
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> Result<()> {
@@ -1468,12 +1454,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
let mut store = self.tl.store.write().unwrap();
store.load_metadata()?;
store
.meta
.as_mut()
.unwrap()
.insert(rel, MetadataSnapshot { size: relsize, lsn });
let meta_hash = store.load_metadata()?;
meta_hash.insert(rel, MetadataSnapshot { size: relsize, lsn });
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue {
size: Some(relsize),
@@ -1489,8 +1471,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
trace!("drop_segment: {} at {}", rel, lsn);
let mut store = self.tl.store.write().unwrap();
store.load_metadata()?;
store.meta.as_mut().unwrap().remove(&rel);
let meta_hash = store.load_metadata()?;
meta_hash.remove(&rel);
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue { size: None }; // None indicates dropped relation
store.data.put(&mk.ser()?, &mv.ser()?)?;

View File

@@ -0,0 +1,219 @@
use anyhow::Result;
use lz4_flex;
use serde::{Deserialize, Serialize};
use std::ops::RangeBounds;
use std::path::Path;
use yakv::storage::{Key, Storage, StorageIterator, Value};
use zenith_utils::bin_ser::BeSer;
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 1024; // 8Mb
const TOAST_VALUE_TAG: u8 = 0;
const PLAIN_VALUE_TAG: u8 = 1;
type ToastId = u32;
///
/// Toast storage consistof two KV databases: one for storing main index
/// and second for storing sliced BLOB (values larger than 2kb).
/// BLOBs and main data are stored in different databases to improve
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
pub update_count: u64,
pub index: Storage, // primary storage
blobs: Storage, // storage for TOAST segments
next_id: ToastId, // counter used to identify new TOAST segments
}
///
/// TOAST reference
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct ToastRef {
toast_id: ToastId, // assigned TOAST indetifier
orig_size: u32, // Original (uncompressed) value size
compressed_size: u32, // Compressed object size
}
///
/// Identifier of TOAST segment.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct ToastSegId {
toast_id: ToastId,
segno: u32, // segment number used to extract segments in proper order
}
pub struct ToastIterator<'a> {
store: &'a ToastStore,
iter: StorageIterator<'a>,
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().and_then(|res| {
Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?))))
})
}
}
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.iter.next_back().and_then(|res| {
Some(res.and_then(|(key, value)| Ok((key, self.store.detoast(value)?))))
})
}
}
//
// FIXME-KK: not using WAL now. Implement asynchronous or delayed commit.
//
impl ToastStore {
pub fn new(path: &Path) -> Result<ToastStore> {
Ok(ToastStore {
update_count: 0,
index: Storage::open(
&path.join("index.db"),
None,
CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
blobs: Storage::open(
&path.join("blobs.db"),
None,
CACHE_SIZE,
CHECKPOINT_INTERVAL,
)?,
next_id: 0,
})
}
pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> {
let mut index_tx = self.index.start_transaction();
let value_len = value.len();
self.update_count += 1;
if value_len >= TOAST_SEGMENT_SIZE {
let mut blobs_tx = self.blobs.start_transaction();
if self.next_id == 0 {
self.next_id = blobs_tx
.iter()
.next_back()
.transpose()?
.map_or(0u32, |(key, _value)| {
ToastSegId::des(&key).unwrap().toast_id
});
}
self.next_id += 1;
let toast_id = self.next_id;
let compressed_data = lz4_flex::compress(value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u32;
while offs + TOAST_SEGMENT_SIZE <= compressed_data_len {
blobs_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
)?;
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
if offs < compressed_data_len {
blobs_tx.put(
&ToastSegId { toast_id, segno }.ser()?,
&compressed_data[offs..].to_vec(),
)?;
}
let mut value = ToastRef {
toast_id,
orig_size: value_len as u32,
compressed_size: compressed_data_len as u32,
}
.ser()?;
value.insert(0, TOAST_VALUE_TAG);
index_tx.put(key, &value)?;
blobs_tx.commit()?;
} else {
let mut vec = Vec::with_capacity(value.len() + 1);
vec.push(PLAIN_VALUE_TAG);
vec.extend_from_slice(&value);
index_tx.put(key, &vec)?;
}
index_tx.commit()?;
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<Value>> {
self.index
.get(&key.to_vec())
.transpose()
.and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?))))
.transpose()
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
ToastIterator {
store: self,
iter: self.index.range(range),
}
}
pub fn remove(&self, key: &Key) -> Result<()> {
let mut index_tx = self.index.start_transaction();
if let Some(value) = index_tx.get(key)? {
if value[0] == TOAST_VALUE_TAG {
let mut blobs_tx = self.blobs.start_transaction();
let toast_ref = ToastRef::des(&value[1..])?;
let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1)
/ TOAST_SEGMENT_SIZE) as u32;
for segno in 0..n_segments {
blobs_tx.remove(
&ToastSegId {
toast_id: toast_ref.toast_id,
segno,
}
.ser()?,
)?;
}
blobs_tx.commit()?;
}
index_tx.remove(key)?;
}
index_tx.commit()?;
Ok(())
}
fn detoast(&self, mut value: Value) -> Result<Value> {
if value[0] == TOAST_VALUE_TAG {
// TOAST chain
let toast_ref = ToastRef::des(&value[1..])?;
let mut toast: Value = Vec::with_capacity(toast_ref.orig_size as usize);
let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1)
/ TOAST_SEGMENT_SIZE) as u32;
let from = ToastSegId {
toast_id: toast_ref.toast_id,
segno: 0,
}
.ser()?;
let till = ToastSegId {
toast_id: toast_ref.toast_id,
segno: n_segments,
}
.ser()?;
for seg in self.blobs.range(from..till) {
toast.extend_from_slice(&seg?.1);
}
Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?)
} else {
value.remove(0); // remove toast tag
Ok(value)
}
}
}