mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 19:50:38 +00:00
Compare commits
2 Commits
jemalloc-p
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ad829e3a76 | ||
|
|
5fd337a674 |
@@ -4,6 +4,7 @@ pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
pub(crate) mod layer_contents;
|
||||
mod remote_layer;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
@@ -30,6 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::layer_contents::virtual_value::{
|
||||
VirtualValue, VirtualValueBuilder,
|
||||
};
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -279,12 +282,12 @@ impl Layer for DeltaLayer {
|
||||
// A subroutine to dump a single blob
|
||||
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
let buf = cursor.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let val = VirtualValue::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
VirtualValue::NaturalImage(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
VirtualValue::NaturalWalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
@@ -293,6 +296,31 @@ impl Layer for DeltaLayer {
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
VirtualValue::ClosedLineage { image, lsns, .. } => {
|
||||
format!(
|
||||
" lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs",
|
||||
buf.len(),
|
||||
lsns.last().unwrap(),
|
||||
image.len(),
|
||||
lsns.len(),
|
||||
)
|
||||
}
|
||||
VirtualValue::ClosedRecLineage { lsns, .. } => {
|
||||
format!(
|
||||
" lin(closed,rec) {} bytes max_lsn {} {} recs",
|
||||
buf.len(),
|
||||
lsns.last().unwrap(),
|
||||
lsns.len() + 1,
|
||||
)
|
||||
}
|
||||
VirtualValue::OpenLineage { lsns, .. } => {
|
||||
format!(
|
||||
" lin(open) {} bytes max_lsn {} {} recs",
|
||||
buf.len(),
|
||||
lsns.last().unwrap(),
|
||||
lsns.len(),
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
};
|
||||
@@ -350,10 +378,12 @@ impl Layer for DeltaLayer {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})?;
|
||||
@@ -368,28 +398,94 @@ impl Layer for DeltaLayer {
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
let val = VirtualValue::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
VirtualValue::NaturalImage(img) => {
|
||||
if lsn_range.contains(&entry_lsn) {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
VirtualValue::NaturalWalRecord(rec) => {
|
||||
if lsn_range.contains(&entry_lsn) {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
VirtualValue::ClosedLineage {
|
||||
image,
|
||||
lsns,
|
||||
records,
|
||||
} => {
|
||||
assert_eq!(lsns.len(), records.len());
|
||||
|
||||
for (lsn, rec) in
|
||||
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
|
||||
{
|
||||
if lsn_range.contains(&lsn) {
|
||||
reconstruct_state.records.push((lsn, rec));
|
||||
}
|
||||
}
|
||||
|
||||
if lsn_range.contains(&entry_lsn) {
|
||||
reconstruct_state.img = Some((entry_lsn, image));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
VirtualValue::ClosedRecLineage {
|
||||
image_rec,
|
||||
lsns,
|
||||
records,
|
||||
} => {
|
||||
assert_eq!(lsns.len(), records.len());
|
||||
|
||||
for (lsn, rec) in
|
||||
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
|
||||
{
|
||||
if lsn_range.contains(&lsn) {
|
||||
reconstruct_state.records.push((lsn, rec));
|
||||
}
|
||||
}
|
||||
|
||||
if lsn_range.contains(&entry_lsn) {
|
||||
reconstruct_state.records.push((entry_lsn, image_rec));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
VirtualValue::OpenLineage {
|
||||
mut lsns,
|
||||
mut records,
|
||||
} => {
|
||||
while let Some(lsn) = lsns.pop() {
|
||||
let rec = records.pop().unwrap();
|
||||
if lsn_range.contains(&lsn) {
|
||||
reconstruct_state.records.push((lsn, rec));
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(records.len(), 1);
|
||||
|
||||
if lsn_range.contains(&entry_lsn) {
|
||||
reconstruct_state
|
||||
.records
|
||||
.push((entry_lsn, records.pop().unwrap()));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
// release metadata lock and close the file
|
||||
}
|
||||
@@ -682,6 +778,8 @@ struct DeltaLayerWriterInner {
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
|
||||
vvbuilder: Option<(Key, VirtualValueBuilder)>,
|
||||
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -724,6 +822,7 @@ impl DeltaLayerWriterInner {
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
vvbuilder: None,
|
||||
key_start,
|
||||
lsn_range,
|
||||
tree: tree_builder,
|
||||
@@ -736,8 +835,42 @@ impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// The values must be appended in key, lsn order.
|
||||
///
|
||||
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
|
||||
fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||
assert!(new_key >= self.key_start);
|
||||
assert!(self.lsn_range.contains(&lsn));
|
||||
|
||||
match self.vvbuilder.take() {
|
||||
None => {
|
||||
let mut builder = VirtualValueBuilder::new();
|
||||
let res = builder.push(lsn, val);
|
||||
assert!(res.is_none());
|
||||
self.vvbuilder = Some((new_key, builder));
|
||||
}
|
||||
Some((key, mut builder)) => {
|
||||
if key != new_key {
|
||||
let (lsn, vvalue) = builder.finish().unwrap();
|
||||
self.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
&VirtualValue::ser(&vvalue)?,
|
||||
vvalue.will_init(),
|
||||
)?;
|
||||
builder = VirtualValueBuilder::new();
|
||||
}
|
||||
|
||||
if let Some((old_lsn, vvalue)) = builder.push(lsn, val) {
|
||||
self.put_value_bytes(
|
||||
new_key,
|
||||
old_lsn,
|
||||
&VirtualValue::ser(&vvalue)?,
|
||||
vvalue.will_init(),
|
||||
)?;
|
||||
}
|
||||
|
||||
self.vvbuilder = Some((new_key, builder));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_value_bytes(
|
||||
@@ -766,7 +899,14 @@ impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
// first, flush the last key's vvalue to disk.
|
||||
if let Some((key, builder)) = self.vvbuilder.take() {
|
||||
if let Some((lsn, vvalue)) = builder.finish() {
|
||||
self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?;
|
||||
};
|
||||
}
|
||||
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -937,7 +1077,7 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
///
|
||||
/// Iterator over all key-value pairse stored in a delta layer
|
||||
/// Iterator over all key-value pairs stored in a delta layer
|
||||
///
|
||||
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
|
||||
/// That takes up quite a lot of memory. Should do this in a more streaming
|
||||
@@ -947,6 +1087,7 @@ struct DeltaValueIter<'a> {
|
||||
all_offsets: Vec<(DeltaKey, BlobRef)>,
|
||||
next_idx: usize,
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
decode_queue: Option<<Vec<(Lsn, Value)> as IntoIterator>::IntoIter>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
@@ -990,12 +1131,21 @@ impl<'a> DeltaValueIter<'a> {
|
||||
all_offsets,
|
||||
next_idx: 0,
|
||||
reader: BlockCursor::new(Adapter(inner)),
|
||||
decode_queue: None,
|
||||
};
|
||||
|
||||
Ok(iter)
|
||||
}
|
||||
|
||||
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
|
||||
if let Some(data) = &mut self.decode_queue {
|
||||
let res = data.next();
|
||||
if let Some((lsn, value)) = res {
|
||||
let key = self.all_offsets[self.next_idx - 1].0.key();
|
||||
return Ok(Some((key, lsn, value)));
|
||||
}
|
||||
}
|
||||
|
||||
if self.next_idx < self.all_offsets.len() {
|
||||
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
|
||||
|
||||
@@ -1003,7 +1153,11 @@ impl<'a> DeltaValueIter<'a> {
|
||||
let lsn = delta_key.lsn();
|
||||
|
||||
let buf = self.reader.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn);
|
||||
let mut iter = val_vec.into_iter();
|
||||
let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") };
|
||||
self.decode_queue = Some(iter);
|
||||
|
||||
self.next_idx += 1;
|
||||
Ok(Some((key, lsn, val)))
|
||||
} else {
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
pub(crate) mod virtual_value;
|
||||
@@ -0,0 +1,237 @@
|
||||
use crate::repository::Value;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// VirtualValue is stored in some Layers instead of the normal repository::Value.
|
||||
///
|
||||
/// It describes one or more Values that are associated to one
|
||||
/// repostiory::Key, containing zero or one base image with all WAL records
|
||||
/// that need to apply to this preceding page image. In balanced tree-based
|
||||
/// indexes this reduces the number of full Keys we need to store, thus
|
||||
/// reducing the size of the layer's index and increasing cache efficiency.
|
||||
///
|
||||
/// Additionally, the abstraction paves the way to implement compression in the
|
||||
/// layer file themselves, as we'd just need to add a new variant to the
|
||||
/// VirtualValue type for compressed types. Examples of such optimizations
|
||||
/// are bitpacked and delta-encoded LSNs in the Lineage variants of this enum.
|
||||
///
|
||||
/// NOTE: Once committed into a hosted branch, these variants _must_ remain
|
||||
/// in this order, and cannot be removed - they are part of the specification
|
||||
/// of the physical layout of the DeltaLayer file. Any reordering is going to
|
||||
/// change the meaning of bytes in existing files and break the compatibility
|
||||
/// with old layers; so make sure you don't reorder these, nor should you
|
||||
/// update the layout of existing variants. You can update new variants as long
|
||||
/// as no user data is written using these variants.
|
||||
///
|
||||
/// NOTE: The first two variants are cloned over from repository::Value, which
|
||||
/// was the definition of the stored data in DeltaLayer before VirtualValue.
|
||||
/// These variants have the same layout and index, so they should (de)serialize
|
||||
/// into the same binary format, guaranteeing backwards compatibility.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum VirtualValue {
|
||||
/// NaturalImage: A natural WAL image, picked from PostgreSQL WAL.
|
||||
NaturalImage(Bytes),
|
||||
/// NaturalWalRecord: A natural WAL record, picked from PostgreSQL WAL.
|
||||
NaturalWalRecord(NeonWalRecord),
|
||||
/// ClosedLineage: A page image, followed by a set of WAL records that are
|
||||
/// applied to that page image.
|
||||
ClosedLineage {
|
||||
image: Bytes,
|
||||
lsns: Vec<Lsn>,
|
||||
records: Vec<NeonWalRecord>,
|
||||
},
|
||||
/// ClosedRecLineage: A will-init WAL record, followed by a set of WAL
|
||||
/// records that are applied to the page image of the WAL record.
|
||||
ClosedRecLineage {
|
||||
image_rec: NeonWalRecord,
|
||||
lsns: Vec<Lsn>,
|
||||
records: Vec<NeonWalRecord>,
|
||||
},
|
||||
/// OpenLineage: A set of WAL records that are applied to the same page,
|
||||
/// but that do not have a known page image in this Layer.
|
||||
OpenLineage {
|
||||
lsns: Vec<Lsn>,
|
||||
records: Vec<NeonWalRecord>,
|
||||
},
|
||||
}
|
||||
|
||||
impl VirtualValue {
|
||||
pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> {
|
||||
match self {
|
||||
VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))],
|
||||
VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))],
|
||||
VirtualValue::ClosedLineage {
|
||||
image,
|
||||
lsns,
|
||||
records,
|
||||
} => {
|
||||
let mut res = Vec::with_capacity(lsns.len() + 1);
|
||||
|
||||
res.push((lsn, Value::Image(image)));
|
||||
|
||||
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
|
||||
res.push((lsn, Value::WalRecord(rec)));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
VirtualValue::ClosedRecLineage {
|
||||
image_rec,
|
||||
lsns,
|
||||
records,
|
||||
} => {
|
||||
let mut res = Vec::with_capacity(lsns.len() + 1);
|
||||
|
||||
res.push((lsn, Value::WalRecord(image_rec)));
|
||||
|
||||
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
|
||||
res.push((lsn, Value::WalRecord(rec)));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
VirtualValue::OpenLineage { lsns, mut records } => {
|
||||
let mut res = Vec::with_capacity(lsns.len() + 1);
|
||||
let first_record = records.remove(0);
|
||||
|
||||
res.push((lsn, Value::WalRecord(first_record)));
|
||||
|
||||
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
|
||||
res.push((lsn, Value::WalRecord(rec)));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn will_init(&self) -> bool {
|
||||
match self {
|
||||
VirtualValue::NaturalImage(_) => true,
|
||||
VirtualValue::NaturalWalRecord(rec) => rec.will_init(),
|
||||
VirtualValue::ClosedLineage { .. } => true,
|
||||
VirtualValue::ClosedRecLineage { .. } => true,
|
||||
VirtualValue::OpenLineage { .. } => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Value> for VirtualValue {
|
||||
fn from(value: Value) -> Self {
|
||||
match value {
|
||||
Value::Image(img) => VirtualValue::NaturalImage(img),
|
||||
Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "deconstruct the value using ::finish to make sure you don't lose intermediate values"]
|
||||
pub struct VirtualValueBuilder {
|
||||
state: Option<(Lsn, VirtualValue)>,
|
||||
}
|
||||
|
||||
impl VirtualValueBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self { state: None }
|
||||
}
|
||||
|
||||
#[must_use = "intermediate emitted values should be stored"]
|
||||
pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> {
|
||||
if let Some((lsn, _)) = &self.state {
|
||||
assert!(new_lsn > *lsn);
|
||||
}
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
let res = self.state.take();
|
||||
self.state = Some((new_lsn, VirtualValue::NaturalImage(img)));
|
||||
res
|
||||
}
|
||||
Value::WalRecord(new_rec) => {
|
||||
if new_rec.will_init() {
|
||||
let res = self.state.take();
|
||||
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
|
||||
return res;
|
||||
}
|
||||
|
||||
match self.state.take() {
|
||||
None => {
|
||||
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
|
||||
None
|
||||
}
|
||||
Some((start_lsn, virtual_value)) => {
|
||||
let new_vv = match virtual_value {
|
||||
VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage {
|
||||
image: img,
|
||||
lsns: vec![new_lsn],
|
||||
records: vec![new_rec],
|
||||
},
|
||||
VirtualValue::NaturalWalRecord(vv_start) => {
|
||||
if vv_start.will_init() {
|
||||
VirtualValue::ClosedRecLineage {
|
||||
image_rec: vv_start,
|
||||
lsns: vec![new_lsn],
|
||||
records: vec![new_rec],
|
||||
}
|
||||
} else {
|
||||
VirtualValue::OpenLineage {
|
||||
lsns: vec![new_lsn],
|
||||
records: vec![vv_start, new_rec],
|
||||
}
|
||||
}
|
||||
}
|
||||
VirtualValue::ClosedLineage {
|
||||
image,
|
||||
mut lsns,
|
||||
mut records,
|
||||
} => {
|
||||
lsns.push(new_lsn);
|
||||
records.push(new_rec);
|
||||
|
||||
VirtualValue::ClosedLineage {
|
||||
image,
|
||||
lsns,
|
||||
records,
|
||||
}
|
||||
}
|
||||
VirtualValue::ClosedRecLineage {
|
||||
image_rec,
|
||||
mut lsns,
|
||||
mut records,
|
||||
} => {
|
||||
lsns.push(new_lsn);
|
||||
records.push(new_rec);
|
||||
|
||||
VirtualValue::ClosedRecLineage {
|
||||
image_rec,
|
||||
lsns,
|
||||
records,
|
||||
}
|
||||
}
|
||||
VirtualValue::OpenLineage {
|
||||
mut lsns,
|
||||
mut records,
|
||||
} => {
|
||||
lsns.push(new_lsn);
|
||||
records.push(new_rec);
|
||||
|
||||
VirtualValue::OpenLineage { lsns, records }
|
||||
}
|
||||
};
|
||||
|
||||
self.state = Some((start_lsn, new_vv));
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> {
|
||||
self.state.take()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user