Change toast store API

This commit is contained in:
Konstantin Knizhnik
2021-11-23 11:34:32 +03:00
parent 915001c67e
commit 92562145c0
2 changed files with 19 additions and 22 deletions

View File

@@ -1489,7 +1489,7 @@ impl BufferedTimeline {
});
let mut store = self.store.write().unwrap();
store.data.put(&key, &PageVersion::Page(img?).ser()?)?;
store.data.put(key, PageVersion::Page(img?).ser()?)?;
n_checkpointed_records += 1;
}
}
@@ -1615,7 +1615,7 @@ impl BufferedTimeline {
// then drop previus version as it is not needed any more
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(&prev_key)?;
store.data.remove(prev_key)?;
result.meta_removed += 1;
// We should reset iterator and start from the current point
continue 'meta;
@@ -1626,7 +1626,7 @@ impl BufferedTimeline {
// object was dropped, so we can immediately remove deteriorated version
drop(store);
let mut store = self.store.write().unwrap();
store.data.remove(&raw_key)?;
store.data.remove(raw_key)?;
dropped.insert(dk.rel);
result.meta_dropped += 1;
// We should reset iterator and start from the current point
@@ -1658,15 +1658,13 @@ impl BufferedTimeline {
lsn: Lsn(0),
});
// Array to accumulate keys we can remove.
// Place it outside main loop to reduce number of dynamic memory allocations
let mut deteriorated: Vec<yakv::storage::Key> = Vec::new();
// currently proceed block number
let mut from_blknum = 0;
'pages: loop {
let store = self.store.read().unwrap();
let iter = store.data.range(&from.ser()?..);
deteriorated.clear();
// Array to accumulate keys we can remove.
let mut deteriorated: Vec<yakv::storage::Key> = Vec::new();
for entry in iter {
let pair = entry?;
let raw_key = pair.0;
@@ -1718,10 +1716,10 @@ impl BufferedTimeline {
// ... then remove all previously accumulated deltas and images, as them are not needed any more
drop(store);
let mut store = self.store.write().unwrap();
for key in deteriorated.iter() {
result.pages_removed += deteriorated.len() as u64;
for key in deteriorated {
store.data.remove(key)?;
}
result.pages_removed += deteriorated.len() as u64;
// We should reset iterator and start from the current point
continue 'pages;
}
@@ -1737,7 +1735,7 @@ impl BufferedTimeline {
drop(store);
let mut store = self.store.write().unwrap();
// We should reset iterator and start from the current point
store.data.remove(&raw_key)?;
store.data.remove(raw_key)?;
result.pages_dropped += 1;
continue 'pages;
}
@@ -1856,7 +1854,7 @@ impl<'a> BufferedTimelineWriter<'a> {
ensure!(lsn.is_aligned(), "unaligned record LSN");
let key = StoreKey::Data(DataKey { rel, blknum, lsn });
let mut store = self.tl.store.write().unwrap();
store.data.put(&key.ser()?, &ver.ser()?)?;
store.data.put(key.ser()?, ver.ser()?)?;
store.brin.insert(
BrinTag {
rel,
@@ -1880,7 +1878,7 @@ impl<'a> BufferedTimelineWriter<'a> {
let mv = MetadataValue {
size: Some(blknum + 1),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
store.data.put(mk.ser()?, mv.ser()?)?;
/* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when
* page in accessed before been wal logged
// Fill gap with zero pages
@@ -1979,7 +1977,7 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
let mv = MetadataValue {
size: Some(relsize),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
store.data.put(mk.ser()?, mv.ser()?)?;
Ok(())
}
@@ -1991,7 +1989,7 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
meta_hash.remove(&rel);
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue { size: None }; // None indicates dropped relation
store.data.put(&mk.ser()?, &mv.ser()?)?;
store.data.put(mk.ser()?, mv.ser()?)?;
Ok(())
}

View File

@@ -131,13 +131,12 @@ impl ToastStore {
})
}
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
pub fn put(&mut self, key: Key, value: Value) -> Result<()> {
let mut tx = self.db.start_transaction();
let value_len = value.len();
let mut key = key.to_vec();
let value = &value.to_vec();
let mut key = key;
if value_len >= TOAST_SEGMENT_SIZE {
let compressed_data = lz4_flex::compress_prepend_size(value);
let compressed_data = lz4_flex::compress_prepend_size(&value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u16;
@@ -162,7 +161,7 @@ impl ToastStore {
}
} else {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, value)?;
tx.put(&key, &value)?;
}
tx.delay()?;
Ok(())
@@ -211,10 +210,10 @@ impl ToastStore {
}
}
pub fn remove(&mut self, key: &[u8]) -> Result<()> {
pub fn remove(&mut self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
let mut min_key = key.to_vec();
let mut max_key = key.to_vec();
let mut min_key = key.clone();
let mut max_key = key;
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);