Improved storage of records in the delta layer btree

Before, we stored each WAL record separately in the main index. With this
change, the main btree only contains:

 - References to Images, and WAL records that apply on top of them
 - References to sequences of WAL records that apply to the same page, but
   don't have an image in this Layer

This reduces the size of the index by some amount, and thus increases the cache-ability of that index.

Unless we're not looking for the latest version of a page or otherwise limit
the lookup window, this change does not significantly impact IO requirements
for normal workloads, as we don't (yet) add any compression to this WAL.
This commit is contained in:
Matthias
2023-04-20 17:53:27 +03:00
committed by Matthias van de Meent
parent c4e1cafb63
commit 5fd337a674
4 changed files with 382 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
pub(crate) mod layer_contents;
mod remote_layer;
use crate::config::PageServerConf;

View File

@@ -30,6 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::layer_contents::virtual_value::{
VirtualValue, VirtualValueBuilder,
};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -279,12 +282,12 @@ impl Layer for DeltaLayer {
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val = VirtualValue::des(&buf)?;
let desc = match val {
Value::Image(img) => {
VirtualValue::NaturalImage(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
VirtualValue::NaturalWalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
@@ -293,6 +296,31 @@ impl Layer for DeltaLayer {
wal_desc
)
}
VirtualValue::ClosedLineage { image, lsns, .. } => {
format!(
" lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs",
buf.len(),
lsns.last().unwrap(),
image.len(),
lsns.len(),
)
}
VirtualValue::ClosedRecLineage { lsns, .. } => {
format!(
" lin(closed,rec) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len() + 1,
)
}
VirtualValue::OpenLineage { lsns, .. } => {
format!(
" lin(open) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len(),
)
}
};
Ok(desc)
};
@@ -350,10 +378,12 @@ impl Layer for DeltaLayer {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
offsets.push((entry_lsn, blob_ref.pos()));
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
@@ -368,28 +398,94 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let val = VirtualValue::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
VirtualValue::NaturalImage(img) => {
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
}
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
VirtualValue::NaturalWalRecord(rec) => {
if lsn_range.contains(&entry_lsn) {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, image));
need_image = false;
break;
}
}
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.records.push((entry_lsn, image_rec));
need_image = false;
break;
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
while let Some(lsn) = lsns.pop() {
let rec = records.pop().unwrap();
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
assert_eq!(records.len(), 1);
if lsn_range.contains(&entry_lsn) {
reconstruct_state
.records
.push((entry_lsn, records.pop().unwrap()));
}
}
};
}
// release metadata lock and close the file
}
@@ -682,6 +778,8 @@ struct DeltaLayerWriterInner {
timeline_id: TimelineId,
tenant_id: TenantId,
vvbuilder: Option<(Key, VirtualValueBuilder)>,
key_start: Key,
lsn_range: Range<Lsn>,
@@ -724,6 +822,7 @@ impl DeltaLayerWriterInner {
path,
timeline_id,
tenant_id,
vvbuilder: None,
key_start,
lsn_range,
tree: tree_builder,
@@ -736,8 +835,42 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
assert!(new_key >= self.key_start);
assert!(self.lsn_range.contains(&lsn));
match self.vvbuilder.take() {
None => {
let mut builder = VirtualValueBuilder::new();
let res = builder.push(lsn, val);
assert!(res.is_none());
self.vvbuilder = Some((new_key, builder));
}
Some((key, mut builder)) => {
if key != new_key {
let (lsn, vvalue) = builder.finish().unwrap();
self.put_value_bytes(
key,
lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
builder = VirtualValueBuilder::new();
}
if let Some((old_lsn, vvalue)) = builder.push(lsn, val) {
self.put_value_bytes(
new_key,
old_lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
}
self.vvbuilder = Some((new_key, builder));
}
}
Ok(())
}
fn put_value_bytes(
@@ -766,7 +899,14 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
// first, flush the last key's vvalue to disk.
if let Some((key, builder)) = self.vvbuilder.take() {
if let Some((lsn, vvalue)) = builder.finish() {
self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?;
};
}
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -937,7 +1077,7 @@ impl Drop for DeltaLayerWriter {
}
///
/// Iterator over all key-value pairse stored in a delta layer
/// Iterator over all key-value pairs stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
@@ -947,6 +1087,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decode_queue: Option<<Vec<(Lsn, Value)> as IntoIterator>::IntoIter>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -990,12 +1131,21 @@ impl<'a> DeltaValueIter<'a> {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decode_queue: None,
};
Ok(iter)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if let Some(data) = &mut self.decode_queue {
let res = data.next();
if let Some((lsn, value)) = res {
let key = self.all_offsets[self.next_idx - 1].0.key();
return Ok(Some((key, lsn, value)));
}
}
if self.next_idx < self.all_offsets.len() {
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
@@ -1003,7 +1153,11 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn);
let mut iter = val_vec.into_iter();
let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") };
self.decode_queue = Some(iter);
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -0,0 +1 @@
pub(crate) mod virtual_value;

View File

@@ -0,0 +1,207 @@
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum VirtualValue {
NaturalImage(Bytes),
NaturalWalRecord(NeonWalRecord),
ClosedLineage {
image: Bytes,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
ClosedRecLineage {
image_rec: NeonWalRecord,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
OpenLineage {
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
}
impl VirtualValue {
pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> {
match self {
VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))],
VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))],
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::Image(image)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::WalRecord(image_rec)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::OpenLineage { lsns, mut records } => {
let mut res = Vec::with_capacity(lsns.len() + 1);
let first_record = records.remove(0);
res.push((lsn, Value::WalRecord(first_record)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
}
}
pub(crate) fn will_init(&self) -> bool {
match self {
VirtualValue::NaturalImage(_) => true,
VirtualValue::NaturalWalRecord(rec) => rec.will_init(),
VirtualValue::ClosedLineage { .. } => true,
VirtualValue::ClosedRecLineage { .. } => true,
VirtualValue::OpenLineage { .. } => false,
}
}
}
impl From<Value> for VirtualValue {
fn from(value: Value) -> Self {
match value {
Value::Image(img) => VirtualValue::NaturalImage(img),
Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec),
}
}
}
pub struct VirtualValueBuilder {
state: Option<(Lsn, VirtualValue)>,
}
impl VirtualValueBuilder {
pub fn new() -> Self {
Self { state: None }
}
pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> {
if let Some((lsn, _)) = &self.state {
assert!(new_lsn > *lsn);
}
match value {
Value::Image(img) => {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalImage(img)));
res
}
Value::WalRecord(new_rec) => {
if new_rec.will_init() {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
return res;
}
match self.state.take() {
None => {
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
None
}
Some((start_lsn, virtual_value)) => {
let new_vv = match virtual_value {
VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage {
image: img,
lsns: vec![new_lsn],
records: vec![new_rec],
},
VirtualValue::NaturalWalRecord(vv_start) => {
if vv_start.will_init() {
VirtualValue::ClosedRecLineage {
image_rec: vv_start,
lsns: vec![new_lsn],
records: vec![new_rec],
}
} else {
VirtualValue::OpenLineage {
lsns: vec![new_lsn],
records: vec![vv_start, new_rec],
}
}
}
VirtualValue::ClosedLineage {
image,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedLineage {
image,
lsns,
records,
}
}
VirtualValue::ClosedRecLineage {
image_rec,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::OpenLineage { lsns, records }
}
};
self.state = Some((start_lsn, new_vv));
None
}
}
}
}
}
pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> {
self.state.take()
}
}
impl Drop for VirtualValueBuilder {
fn drop(&mut self) {
assert!(self.state.is_none());
}
}