diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2ee723e7c3..d6a555d934 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -4,6 +4,7 @@ pub mod delta_layer; mod filename; mod image_layer; mod inmemory_layer; +pub(crate) mod layer_contents; mod remote_layer; use crate::config::PageServerConf; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 98cbcc5f07..59aef83fbd 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -30,6 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::storage_layer::layer_contents::virtual_value::{ + VirtualValue, VirtualValueBuilder, +}; use crate::tenant::storage_layer::{ PersistentLayer, ValueReconstructResult, ValueReconstructState, }; @@ -279,12 +282,12 @@ impl Layer for DeltaLayer { // A subroutine to dump a single blob let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result { let buf = cursor.read_blob(blob_ref.pos())?; - let val = Value::des(&buf)?; + let val = VirtualValue::des(&buf)?; let desc = match val { - Value::Image(img) => { + VirtualValue::NaturalImage(img) => { format!(" img {} bytes", img.len()) } - Value::WalRecord(rec) => { + VirtualValue::NaturalWalRecord(rec) => { let wal_desc = walrecord::describe_wal_record(&rec)?; format!( " rec {} bytes will_init: {} {}", @@ -293,6 +296,31 @@ impl Layer for DeltaLayer { wal_desc ) } + VirtualValue::ClosedLineage { image, lsns, .. } => { + format!( + " lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs", + buf.len(), + lsns.last().unwrap(), + image.len(), + lsns.len(), + ) + } + VirtualValue::ClosedRecLineage { lsns, .. } => { + format!( + " lin(closed,rec) {} bytes max_lsn {} {} recs", + buf.len(), + lsns.last().unwrap(), + lsns.len() + 1, + ) + } + VirtualValue::OpenLineage { lsns, .. } => { + format!( + " lin(open) {} bytes max_lsn {} {} recs", + buf.len(), + lsns.last().unwrap(), + lsns.len(), + ) + } }; Ok(desc) }; @@ -350,10 +378,12 @@ impl Layer for DeltaLayer { return false; } let entry_lsn = DeltaKey::extract_lsn_from_buf(key); + + offsets.push((entry_lsn, blob_ref.pos())); + if entry_lsn < lsn_range.start { return false; } - offsets.push((entry_lsn, blob_ref.pos())); !blob_ref.will_init() })?; @@ -368,28 +398,94 @@ impl Layer for DeltaLayer { file.file.path.display() ) })?; - let val = Value::des(&buf).with_context(|| { + let val = VirtualValue::des(&buf).with_context(|| { format!( "Failed to deserialize file blob from virtual file {}", file.file.path.display() ) })?; + match val { - Value::Image(img) => { - reconstruct_state.img = Some((entry_lsn, img)); - need_image = false; + VirtualValue::NaturalImage(img) => { + if lsn_range.contains(&entry_lsn) { + reconstruct_state.img = Some((entry_lsn, img)); + need_image = false; + } break; } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back + VirtualValue::NaturalWalRecord(rec) => { + if lsn_range.contains(&entry_lsn) { + let will_init = rec.will_init(); + reconstruct_state.records.push((entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } + } + } + VirtualValue::ClosedLineage { + image, + lsns, + records, + } => { + assert_eq!(lsns.len(), records.len()); + + for (lsn, rec) in + Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev()) + { + if lsn_range.contains(&lsn) { + reconstruct_state.records.push((lsn, rec)); + } + } + + if lsn_range.contains(&entry_lsn) { + reconstruct_state.img = Some((entry_lsn, image)); need_image = false; break; } } - } + VirtualValue::ClosedRecLineage { + image_rec, + lsns, + records, + } => { + assert_eq!(lsns.len(), records.len()); + + for (lsn, rec) in + Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev()) + { + if lsn_range.contains(&lsn) { + reconstruct_state.records.push((lsn, rec)); + } + } + + if lsn_range.contains(&entry_lsn) { + reconstruct_state.records.push((entry_lsn, image_rec)); + need_image = false; + break; + } + } + VirtualValue::OpenLineage { + mut lsns, + mut records, + } => { + while let Some(lsn) = lsns.pop() { + let rec = records.pop().unwrap(); + if lsn_range.contains(&lsn) { + reconstruct_state.records.push((lsn, rec)); + } + } + + assert_eq!(records.len(), 1); + + if lsn_range.contains(&entry_lsn) { + reconstruct_state + .records + .push((entry_lsn, records.pop().unwrap())); + } + } + }; } // release metadata lock and close the file } @@ -682,6 +778,8 @@ struct DeltaLayerWriterInner { timeline_id: TimelineId, tenant_id: TenantId, + vvbuilder: Option<(Key, VirtualValueBuilder)>, + key_start: Key, lsn_range: Range, @@ -724,6 +822,7 @@ impl DeltaLayerWriterInner { path, timeline_id, tenant_id, + vvbuilder: None, key_start, lsn_range, tree: tree_builder, @@ -736,8 +835,42 @@ impl DeltaLayerWriterInner { /// /// The values must be appended in key, lsn order. /// - fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { - self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) + fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + assert!(new_key >= self.key_start); + assert!(self.lsn_range.contains(&lsn)); + + match self.vvbuilder.take() { + None => { + let mut builder = VirtualValueBuilder::new(); + let res = builder.push(lsn, val); + assert!(res.is_none()); + self.vvbuilder = Some((new_key, builder)); + } + Some((key, mut builder)) => { + if key != new_key { + let (lsn, vvalue) = builder.finish().unwrap(); + self.put_value_bytes( + key, + lsn, + &VirtualValue::ser(&vvalue)?, + vvalue.will_init(), + )?; + builder = VirtualValueBuilder::new(); + } + + if let Some((old_lsn, vvalue)) = builder.push(lsn, val) { + self.put_value_bytes( + new_key, + old_lsn, + &VirtualValue::ser(&vvalue)?, + vvalue.will_init(), + )?; + } + + self.vvbuilder = Some((new_key, builder)); + } + } + Ok(()) } fn put_value_bytes( @@ -766,7 +899,14 @@ impl DeltaLayerWriterInner { /// /// Finish writing the delta layer. /// - fn finish(self, key_end: Key) -> anyhow::Result { + fn finish(mut self, key_end: Key) -> anyhow::Result { + // first, flush the last key's vvalue to disk. + if let Some((key, builder)) = self.vvbuilder.take() { + if let Some((lsn, vvalue)) = builder.finish() { + self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?; + }; + } + let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -937,7 +1077,7 @@ impl Drop for DeltaLayerWriter { } /// -/// Iterator over all key-value pairse stored in a delta layer +/// Iterator over all key-value pairs stored in a delta layer /// /// FIXME: This creates a Vector to hold the offsets of all key value pairs. /// That takes up quite a lot of memory. Should do this in a more streaming @@ -947,6 +1087,7 @@ struct DeltaValueIter<'a> { all_offsets: Vec<(DeltaKey, BlobRef)>, next_idx: usize, reader: BlockCursor>, + decode_queue: Option< as IntoIterator>::IntoIter>, } struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); @@ -990,12 +1131,21 @@ impl<'a> DeltaValueIter<'a> { all_offsets, next_idx: 0, reader: BlockCursor::new(Adapter(inner)), + decode_queue: None, }; Ok(iter) } fn next_res(&mut self) -> Result> { + if let Some(data) = &mut self.decode_queue { + let res = data.next(); + if let Some((lsn, value)) = res { + let key = self.all_offsets[self.next_idx - 1].0.key(); + return Ok(Some((key, lsn, value))); + } + } + if self.next_idx < self.all_offsets.len() { let (delta_key, blob_ref) = &self.all_offsets[self.next_idx]; @@ -1003,7 +1153,11 @@ impl<'a> DeltaValueIter<'a> { let lsn = delta_key.lsn(); let buf = self.reader.read_blob(blob_ref.pos())?; - let val = Value::des(&buf)?; + let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn); + let mut iter = val_vec.into_iter(); + let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") }; + self.decode_queue = Some(iter); + self.next_idx += 1; Ok(Some((key, lsn, val))) } else { diff --git a/pageserver/src/tenant/storage_layer/layer_contents/mod.rs b/pageserver/src/tenant/storage_layer/layer_contents/mod.rs new file mode 100644 index 0000000000..bea159ede8 --- /dev/null +++ b/pageserver/src/tenant/storage_layer/layer_contents/mod.rs @@ -0,0 +1 @@ +pub(crate) mod virtual_value; diff --git a/pageserver/src/tenant/storage_layer/layer_contents/virtual_value.rs b/pageserver/src/tenant/storage_layer/layer_contents/virtual_value.rs new file mode 100644 index 0000000000..5061a89f32 --- /dev/null +++ b/pageserver/src/tenant/storage_layer/layer_contents/virtual_value.rs @@ -0,0 +1,207 @@ +use crate::repository::Value; +use crate::walrecord::NeonWalRecord; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use utils::lsn::Lsn; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum VirtualValue { + NaturalImage(Bytes), + NaturalWalRecord(NeonWalRecord), + ClosedLineage { + image: Bytes, + lsns: Vec, + records: Vec, + }, + ClosedRecLineage { + image_rec: NeonWalRecord, + lsns: Vec, + records: Vec, + }, + OpenLineage { + lsns: Vec, + records: Vec, + }, +} + +impl VirtualValue { + pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> { + match self { + VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))], + VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))], + VirtualValue::ClosedLineage { + image, + lsns, + records, + } => { + let mut res = Vec::with_capacity(lsns.len() + 1); + + res.push((lsn, Value::Image(image))); + + for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) { + res.push((lsn, Value::WalRecord(rec))); + } + + res + } + VirtualValue::ClosedRecLineage { + image_rec, + lsns, + records, + } => { + let mut res = Vec::with_capacity(lsns.len() + 1); + + res.push((lsn, Value::WalRecord(image_rec))); + + for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) { + res.push((lsn, Value::WalRecord(rec))); + } + + res + } + VirtualValue::OpenLineage { lsns, mut records } => { + let mut res = Vec::with_capacity(lsns.len() + 1); + let first_record = records.remove(0); + + res.push((lsn, Value::WalRecord(first_record))); + + for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) { + res.push((lsn, Value::WalRecord(rec))); + } + + res + } + } + } + + pub(crate) fn will_init(&self) -> bool { + match self { + VirtualValue::NaturalImage(_) => true, + VirtualValue::NaturalWalRecord(rec) => rec.will_init(), + VirtualValue::ClosedLineage { .. } => true, + VirtualValue::ClosedRecLineage { .. } => true, + VirtualValue::OpenLineage { .. } => false, + } + } +} + +impl From for VirtualValue { + fn from(value: Value) -> Self { + match value { + Value::Image(img) => VirtualValue::NaturalImage(img), + Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec), + } + } +} + +pub struct VirtualValueBuilder { + state: Option<(Lsn, VirtualValue)>, +} + +impl VirtualValueBuilder { + pub fn new() -> Self { + Self { state: None } + } + + pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> { + if let Some((lsn, _)) = &self.state { + assert!(new_lsn > *lsn); + } + + match value { + Value::Image(img) => { + let res = self.state.take(); + self.state = Some((new_lsn, VirtualValue::NaturalImage(img))); + res + } + Value::WalRecord(new_rec) => { + if new_rec.will_init() { + let res = self.state.take(); + self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec))); + return res; + } + + match self.state.take() { + None => { + self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec))); + None + } + Some((start_lsn, virtual_value)) => { + let new_vv = match virtual_value { + VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage { + image: img, + lsns: vec![new_lsn], + records: vec![new_rec], + }, + VirtualValue::NaturalWalRecord(vv_start) => { + if vv_start.will_init() { + VirtualValue::ClosedRecLineage { + image_rec: vv_start, + lsns: vec![new_lsn], + records: vec![new_rec], + } + } else { + VirtualValue::OpenLineage { + lsns: vec![new_lsn], + records: vec![vv_start, new_rec], + } + } + } + VirtualValue::ClosedLineage { + image, + mut lsns, + mut records, + } => { + lsns.push(new_lsn); + records.push(new_rec); + + VirtualValue::ClosedLineage { + image, + lsns, + records, + } + } + VirtualValue::ClosedRecLineage { + image_rec, + mut lsns, + mut records, + } => { + lsns.push(new_lsn); + records.push(new_rec); + + VirtualValue::ClosedRecLineage { + image_rec, + lsns, + records, + } + } + VirtualValue::OpenLineage { + mut lsns, + mut records, + } => { + lsns.push(new_lsn); + records.push(new_rec); + + VirtualValue::OpenLineage { lsns, records } + } + }; + + self.state = Some((start_lsn, new_vv)); + + None + } + } + } + } + } + + pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> { + self.state.take() + } +} + +impl Drop for VirtualValueBuilder { + fn drop(&mut self) { + assert!(self.state.is_none()); + } +}