Compare commits

...

2 Commits

Author SHA1 Message Date
Matthias van de Meent
ad829e3a76 PR comment updates:
- Add docs on VirtualValue, variants.
- Use #[must_use] instead of `impl Drop` hack.
2023-04-27 18:39:53 +02:00
Matthias
5fd337a674 Improved storage of records in the delta layer btree
Before, we stored each WAL record separately in the main index. With this
change, the main btree only contains:

 - References to Images, and WAL records that apply on top of them
 - References to sequences of WAL records that apply to the same page, but
   don't have an image in this Layer

This reduces the size of the index by some amount, and thus increases the cache-ability of that index.

Unless we're not looking for the latest version of a page or otherwise limit
the lookup window, this change does not significantly impact IO requirements
for normal workloads, as we don't (yet) add any compression to this WAL.
2023-04-27 18:39:53 +02:00
4 changed files with 412 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
pub(crate) mod layer_contents;
mod remote_layer;
use crate::config::PageServerConf;

View File

@@ -30,6 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::layer_contents::virtual_value::{
VirtualValue, VirtualValueBuilder,
};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -279,12 +282,12 @@ impl Layer for DeltaLayer {
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val = VirtualValue::des(&buf)?;
let desc = match val {
Value::Image(img) => {
VirtualValue::NaturalImage(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
VirtualValue::NaturalWalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
@@ -293,6 +296,31 @@ impl Layer for DeltaLayer {
wal_desc
)
}
VirtualValue::ClosedLineage { image, lsns, .. } => {
format!(
" lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs",
buf.len(),
lsns.last().unwrap(),
image.len(),
lsns.len(),
)
}
VirtualValue::ClosedRecLineage { lsns, .. } => {
format!(
" lin(closed,rec) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len() + 1,
)
}
VirtualValue::OpenLineage { lsns, .. } => {
format!(
" lin(open) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len(),
)
}
};
Ok(desc)
};
@@ -350,10 +378,12 @@ impl Layer for DeltaLayer {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
offsets.push((entry_lsn, blob_ref.pos()));
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
@@ -368,28 +398,94 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let val = VirtualValue::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
VirtualValue::NaturalImage(img) => {
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
}
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
VirtualValue::NaturalWalRecord(rec) => {
if lsn_range.contains(&entry_lsn) {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, image));
need_image = false;
break;
}
}
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.records.push((entry_lsn, image_rec));
need_image = false;
break;
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
while let Some(lsn) = lsns.pop() {
let rec = records.pop().unwrap();
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
assert_eq!(records.len(), 1);
if lsn_range.contains(&entry_lsn) {
reconstruct_state
.records
.push((entry_lsn, records.pop().unwrap()));
}
}
};
}
// release metadata lock and close the file
}
@@ -682,6 +778,8 @@ struct DeltaLayerWriterInner {
timeline_id: TimelineId,
tenant_id: TenantId,
vvbuilder: Option<(Key, VirtualValueBuilder)>,
key_start: Key,
lsn_range: Range<Lsn>,
@@ -724,6 +822,7 @@ impl DeltaLayerWriterInner {
path,
timeline_id,
tenant_id,
vvbuilder: None,
key_start,
lsn_range,
tree: tree_builder,
@@ -736,8 +835,42 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
assert!(new_key >= self.key_start);
assert!(self.lsn_range.contains(&lsn));
match self.vvbuilder.take() {
None => {
let mut builder = VirtualValueBuilder::new();
let res = builder.push(lsn, val);
assert!(res.is_none());
self.vvbuilder = Some((new_key, builder));
}
Some((key, mut builder)) => {
if key != new_key {
let (lsn, vvalue) = builder.finish().unwrap();
self.put_value_bytes(
key,
lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
builder = VirtualValueBuilder::new();
}
if let Some((old_lsn, vvalue)) = builder.push(lsn, val) {
self.put_value_bytes(
new_key,
old_lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
}
self.vvbuilder = Some((new_key, builder));
}
}
Ok(())
}
fn put_value_bytes(
@@ -766,7 +899,14 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
// first, flush the last key's vvalue to disk.
if let Some((key, builder)) = self.vvbuilder.take() {
if let Some((lsn, vvalue)) = builder.finish() {
self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?;
};
}
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -937,7 +1077,7 @@ impl Drop for DeltaLayerWriter {
}
///
/// Iterator over all key-value pairse stored in a delta layer
/// Iterator over all key-value pairs stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
@@ -947,6 +1087,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decode_queue: Option<<Vec<(Lsn, Value)> as IntoIterator>::IntoIter>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -990,12 +1131,21 @@ impl<'a> DeltaValueIter<'a> {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decode_queue: None,
};
Ok(iter)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if let Some(data) = &mut self.decode_queue {
let res = data.next();
if let Some((lsn, value)) = res {
let key = self.all_offsets[self.next_idx - 1].0.key();
return Ok(Some((key, lsn, value)));
}
}
if self.next_idx < self.all_offsets.len() {
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
@@ -1003,7 +1153,11 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn);
let mut iter = val_vec.into_iter();
let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") };
self.decode_queue = Some(iter);
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -0,0 +1 @@
pub(crate) mod virtual_value;

View File

@@ -0,0 +1,237 @@
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
/// VirtualValue is stored in some Layers instead of the normal repository::Value.
///
/// It describes one or more Values that are associated to one
/// repostiory::Key, containing zero or one base image with all WAL records
/// that need to apply to this preceding page image. In balanced tree-based
/// indexes this reduces the number of full Keys we need to store, thus
/// reducing the size of the layer's index and increasing cache efficiency.
///
/// Additionally, the abstraction paves the way to implement compression in the
/// layer file themselves, as we'd just need to add a new variant to the
/// VirtualValue type for compressed types. Examples of such optimizations
/// are bitpacked and delta-encoded LSNs in the Lineage variants of this enum.
///
/// NOTE: Once committed into a hosted branch, these variants _must_ remain
/// in this order, and cannot be removed - they are part of the specification
/// of the physical layout of the DeltaLayer file. Any reordering is going to
/// change the meaning of bytes in existing files and break the compatibility
/// with old layers; so make sure you don't reorder these, nor should you
/// update the layout of existing variants. You can update new variants as long
/// as no user data is written using these variants.
///
/// NOTE: The first two variants are cloned over from repository::Value, which
/// was the definition of the stored data in DeltaLayer before VirtualValue.
/// These variants have the same layout and index, so they should (de)serialize
/// into the same binary format, guaranteeing backwards compatibility.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum VirtualValue {
/// NaturalImage: A natural WAL image, picked from PostgreSQL WAL.
NaturalImage(Bytes),
/// NaturalWalRecord: A natural WAL record, picked from PostgreSQL WAL.
NaturalWalRecord(NeonWalRecord),
/// ClosedLineage: A page image, followed by a set of WAL records that are
/// applied to that page image.
ClosedLineage {
image: Bytes,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// ClosedRecLineage: A will-init WAL record, followed by a set of WAL
/// records that are applied to the page image of the WAL record.
ClosedRecLineage {
image_rec: NeonWalRecord,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// OpenLineage: A set of WAL records that are applied to the same page,
/// but that do not have a known page image in this Layer.
OpenLineage {
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
}
impl VirtualValue {
pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> {
match self {
VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))],
VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))],
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::Image(image)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::WalRecord(image_rec)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::OpenLineage { lsns, mut records } => {
let mut res = Vec::with_capacity(lsns.len() + 1);
let first_record = records.remove(0);
res.push((lsn, Value::WalRecord(first_record)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
}
}
pub(crate) fn will_init(&self) -> bool {
match self {
VirtualValue::NaturalImage(_) => true,
VirtualValue::NaturalWalRecord(rec) => rec.will_init(),
VirtualValue::ClosedLineage { .. } => true,
VirtualValue::ClosedRecLineage { .. } => true,
VirtualValue::OpenLineage { .. } => false,
}
}
}
impl From<Value> for VirtualValue {
fn from(value: Value) -> Self {
match value {
Value::Image(img) => VirtualValue::NaturalImage(img),
Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec),
}
}
}
#[must_use = "deconstruct the value using ::finish to make sure you don't lose intermediate values"]
pub struct VirtualValueBuilder {
state: Option<(Lsn, VirtualValue)>,
}
impl VirtualValueBuilder {
pub fn new() -> Self {
Self { state: None }
}
#[must_use = "intermediate emitted values should be stored"]
pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> {
if let Some((lsn, _)) = &self.state {
assert!(new_lsn > *lsn);
}
match value {
Value::Image(img) => {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalImage(img)));
res
}
Value::WalRecord(new_rec) => {
if new_rec.will_init() {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
return res;
}
match self.state.take() {
None => {
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
None
}
Some((start_lsn, virtual_value)) => {
let new_vv = match virtual_value {
VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage {
image: img,
lsns: vec![new_lsn],
records: vec![new_rec],
},
VirtualValue::NaturalWalRecord(vv_start) => {
if vv_start.will_init() {
VirtualValue::ClosedRecLineage {
image_rec: vv_start,
lsns: vec![new_lsn],
records: vec![new_rec],
}
} else {
VirtualValue::OpenLineage {
lsns: vec![new_lsn],
records: vec![vv_start, new_rec],
}
}
}
VirtualValue::ClosedLineage {
image,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedLineage {
image,
lsns,
records,
}
}
VirtualValue::ClosedRecLineage {
image_rec,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::OpenLineage { lsns, records }
}
};
self.state = Some((start_lsn, new_vv));
None
}
}
}
}
}
#[must_use]
pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> {
self.state.take()
}
}