diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml index 3f80f8fcdb..c8c0f4c990 100644 --- a/libs/wal_decoder/Cargo.toml +++ b/libs/wal_decoder/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true license.workspace = true [features] -testing = [] +testing = ["pageserver_api/testing"] [dependencies] anyhow.workspace = true diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 780fce3d69..684718d220 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -2,15 +2,13 @@ //! raw bytes which represent a raw Postgres WAL record. use crate::models::*; -use bytes::{Buf, Bytes, BytesMut}; -use pageserver_api::key::rel_block_to_key; -use pageserver_api::record::NeonWalRecord; +use crate::serialized_batch::SerializedValueBatch; +use bytes::{Buf, Bytes}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::walrecord::*; -use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; use utils::lsn::Lsn; impl InterpretedWalRecord { @@ -21,11 +19,12 @@ impl InterpretedWalRecord { pub fn from_bytes_filtered( buf: Bytes, shard: &ShardIdentity, - lsn: Lsn, + record_end_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { let mut decoded = DecodedWALRecord::default(); decode_wal_record(buf, &mut decoded, pg_version)?; + let xid = decoded.xl_xid; let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) { FlushUncommittedRecords::Yes @@ -33,96 +32,20 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?; - - let mut blocks = Vec::default(); - for blk in decoded.blocks.iter() { - let rel = RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum, - }; - - let key = rel_block_to_key(rel, blk.blkno); - - if !key.is_valid_key_on_write_path() { - anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key); - } - - let key_is_local = shard.is_key_local(&key); - - tracing::debug!( - lsn=%lsn, - key=%key, - "ingest: shard decision {}", - if !key_is_local { "drop" } else { "keep" }, - ); - - if !key_is_local { - if shard.is_shard_zero() { - // Shard 0 tracks relation sizes. Although we will not store this block, we will observe - // its blkno in case it implicitly extends a relation. - blocks.push((key.to_compact(), None)); - } - - continue; - } - - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - let value = if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version) - // do not materialize null pages because them most likely be soon replaced with real data - && blk.bimg_len != 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(BLCKSZ as usize); - // TODO(vlad): skip the copy - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - // - // Match the logic of XLogReadBufferForRedoExtended: - // The page may be uninitialized. If so, we can't set the LSN because - // that would corrupt the page. - // - if !page_is_new(&image) { - page_set_lsn(&mut image, lsn) - } - assert_eq!(image.len(), BLCKSZ as usize); - - Value::Image(image.freeze()) - } else { - Value::WalRecord(NeonWalRecord::Postgres { - will_init: blk.will_init || blk.apply_image, - rec: decoded.record.clone(), - }) - }; - - blocks.push((key.to_compact(), Some(value))); - } + let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?; + let batch = SerializedValueBatch::from_decoded_filtered( + decoded, + shard, + record_end_lsn, + pg_version, + )?; Ok(InterpretedWalRecord { metadata_record, - blocks, - lsn, + batch, + end_lsn: record_end_lsn, flush_uncommitted, - xid: decoded.xl_xid, + xid, }) } } @@ -130,7 +53,7 @@ impl InterpretedWalRecord { impl MetadataRecord { fn from_decoded( decoded: &DecodedWALRecord, - lsn: Lsn, + record_end_lsn: Lsn, pg_version: u32, ) -> anyhow::Result> { // Note: this doesn't actually copy the bytes since @@ -151,7 +74,7 @@ impl MetadataRecord { Ok(None) } pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), - pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn), + pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn), pg_constants::RM_MULTIXACT_ID => { Self::decode_multixact_record(&mut buf, decoded, pg_version) } @@ -163,7 +86,7 @@ impl MetadataRecord { // // Alternatively, one can make the checkpoint part of the subscription protocol // to the pageserver. This should work fine, but can be done at a later point. - pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn), + pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn), pg_constants::RM_LOGICALMSG_ID => { Self::decode_logical_message_record(&mut buf, decoded) } diff --git a/libs/wal_decoder/src/lib.rs b/libs/wal_decoder/src/lib.rs index 05349d17c9..a8a26956e6 100644 --- a/libs/wal_decoder/src/lib.rs +++ b/libs/wal_decoder/src/lib.rs @@ -1,2 +1,3 @@ pub mod decoder; pub mod models; +pub mod serialized_batch; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 92b66fcefd..5d90eeb69c 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -2,7 +2,8 @@ //! ready for the pageserver to interpret. They are derived from the original //! WAL records, so that each struct corresponds closely to one WAL record of //! a specific kind. They contain the same information as the original WAL records, -//! just decoded into structs and fields for easier access. +//! but the values are already serialized in a [`SerializedValueBatch`], which +//! is the format that the pageserver is expecting them in. //! //! The ingestion code uses these structs to help with parsing the WAL records, //! and it splits them into a stream of modifications to the key-value pairs that @@ -25,9 +26,7 @@ //! |--> write to KV store within the pageserver use bytes::Bytes; -use pageserver_api::key::CompactKey; use pageserver_api::reltag::{RelTag, SlruKind}; -use pageserver_api::value::Value; use postgres_ffi::walrecord::{ XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet, XlSmgrTruncate, XlXactParsedRecord, @@ -35,6 +34,8 @@ use postgres_ffi::walrecord::{ use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; +use crate::serialized_batch::SerializedValueBatch; + pub enum FlushUncommittedRecords { Yes, No, @@ -45,12 +46,11 @@ pub struct InterpretedWalRecord { /// Optional metadata record - may cause writes to metadata keys /// in the storage engine pub metadata_record: Option, - /// Images or deltas for blocks modified in the original WAL record. - /// The [`Value`] is optional to avoid sending superfluous data to - /// shard 0 for relation size tracking. - pub blocks: Vec<(CompactKey, Option)>, + /// A pre-serialized batch along with the required metadata for ingestion + /// by the pageserver + pub batch: SerializedValueBatch, /// Byte offset within WAL for the end of the original PG WAL record - pub lsn: Lsn, + pub end_lsn: Lsn, /// Whether to flush all uncommitted modifications to the storage engine /// before ingesting this record. This is currently only used for legacy PG /// database creations which read pages from a template database. Such WAL diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs new file mode 100644 index 0000000000..8f33291023 --- /dev/null +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -0,0 +1,862 @@ +//! This module implements batch type for serialized [`pageserver_api::value::Value`] +//! instances. Each batch contains a raw buffer (serialized values) +//! and a list of metadata for each (key, LSN) tuple present in the batch. +//! +//! Such batches are created from decoded PG wal records and ingested +//! by the pageserver by writing directly to the ephemeral file. + +use std::collections::BTreeSet; + +use bytes::{Bytes, BytesMut}; +use pageserver_api::key::rel_block_to_key; +use pageserver_api::keyspace::KeySpace; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::reltag::RelTag; +use pageserver_api::shard::ShardIdentity; +use pageserver_api::{key::CompactKey, value::Value}; +use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; +use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; + +use pageserver_api::key::Key; + +static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); + +/// Accompanying metadata for the batch +/// A value may be serialized and stored into the batch or just "observed". +/// Shard 0 currently "observes" all values in order to accurately track +/// relation sizes. In the case of "observed" values, we only need to know +/// the key and LSN, so two types of metadata are supported to save on network +/// bandwidth. +pub enum ValueMeta { + Serialized(SerializedValueMeta), + Observed(ObservedValueMeta), +} + +impl ValueMeta { + pub fn key(&self) -> CompactKey { + match self { + Self::Serialized(ser) => ser.key, + Self::Observed(obs) => obs.key, + } + } + + pub fn lsn(&self) -> Lsn { + match self { + Self::Serialized(ser) => ser.lsn, + Self::Observed(obs) => obs.lsn, + } + } +} + +/// Wrapper around [`ValueMeta`] that implements ordering by +/// (key, LSN) tuples +struct OrderedValueMeta(ValueMeta); + +impl Ord for OrderedValueMeta { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn())) + } +} + +impl PartialOrd for OrderedValueMeta { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for OrderedValueMeta { + fn eq(&self, other: &Self) -> bool { + (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn()) + } +} + +impl Eq for OrderedValueMeta {} + +/// Metadata for a [`Value`] serialized into the batch. +pub struct SerializedValueMeta { + pub key: CompactKey, + pub lsn: Lsn, + /// Starting offset of the value for the (key, LSN) tuple + /// in [`SerializedValueBatch::raw`] + pub batch_offset: u64, + pub len: usize, + pub will_init: bool, +} + +/// Metadata for a [`Value`] observed by the batch +pub struct ObservedValueMeta { + pub key: CompactKey, + pub lsn: Lsn, +} + +/// Batch of serialized [`Value`]s. +pub struct SerializedValueBatch { + /// [`Value`]s serialized in EphemeralFile's native format, + /// ready for disk write by the pageserver + pub raw: Vec, + + /// Metadata to make sense of the bytes in [`Self::raw`] + /// and represent "observed" values. + /// + /// Invariant: Metadata entries for any given key are ordered + /// by LSN. Note that entries for a key do not have to be contiguous. + pub metadata: Vec, + + /// The highest LSN of any value in the batch + pub max_lsn: Lsn, + + /// Number of values encoded by [`Self::raw`] + pub len: usize, +} + +impl Default for SerializedValueBatch { + fn default() -> Self { + Self { + raw: Default::default(), + metadata: Default::default(), + max_lsn: Lsn(0), + len: 0, + } + } +} + +impl SerializedValueBatch { + /// Build a batch of serialized values from a decoded PG WAL record + /// + /// The batch will only contain values for keys targeting the specifiec + /// shard. Shard 0 is a special case, where any keys that don't belong to + /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`], + /// but absent from the raw buffer [`SerializedValueBatch::raw`]). + pub(crate) fn from_decoded_filtered( + decoded: DecodedWALRecord, + shard: &ShardIdentity, + record_end_lsn: Lsn, + pg_version: u32, + ) -> anyhow::Result { + // First determine how big the buffer needs to be and allocate it up-front. + // This duplicates some of the work below, but it's empirically much faster. + let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version); + let mut buf = Vec::::with_capacity(estimated_buffer_size); + + let mut metadata: Vec = Vec::with_capacity(decoded.blocks.len()); + let mut max_lsn: Lsn = Lsn(0); + let mut len: usize = 0; + for blk in decoded.blocks.iter() { + let relative_off = buf.len() as u64; + + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + + if !key.is_valid_key_on_write_path() { + anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key); + } + + let key_is_local = shard.is_key_local(&key); + + tracing::debug!( + lsn=%record_end_lsn, + key=%key, + "ingest: shard decision {}", + if !key_is_local { "drop" } else { "keep" }, + ); + + if !key_is_local { + if shard.is_shard_zero() { + // Shard 0 tracks relation sizes. Although we will not store this block, we will observe + // its blkno in case it implicitly extends a relation. + metadata.push(ValueMeta::Observed(ObservedValueMeta { + key: key.to_compact(), + lsn: record_end_lsn, + })) + } + + continue; + } + + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // + let val = if Self::block_is_image(&decoded, blk, pg_version) { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(BLCKSZ as usize); + // TODO(vlad): skip the copy + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + // + // Match the logic of XLogReadBufferForRedoExtended: + // The page may be uninitialized. If so, we can't set the LSN because + // that would corrupt the page. + // + if !page_is_new(&image) { + page_set_lsn(&mut image, record_end_lsn) + } + assert_eq!(image.len(), BLCKSZ as usize); + + Value::Image(image.freeze()) + } else { + Value::WalRecord(NeonWalRecord::Postgres { + will_init: blk.will_init || blk.apply_image, + rec: decoded.record.clone(), + }) + }; + + val.ser_into(&mut buf) + .expect("Writing into in-memory buffer is infallible"); + + let val_ser_size = buf.len() - relative_off as usize; + + metadata.push(ValueMeta::Serialized(SerializedValueMeta { + key: key.to_compact(), + lsn: record_end_lsn, + batch_offset: relative_off, + len: val_ser_size, + will_init: val.will_init(), + })); + max_lsn = std::cmp::max(max_lsn, record_end_lsn); + len += 1; + } + + if cfg!(any(debug_assertions, test)) { + let batch = Self { + raw: buf, + metadata, + max_lsn, + len, + }; + + batch.validate_lsn_order(); + + return Ok(batch); + } + + Ok(Self { + raw: buf, + metadata, + max_lsn, + len, + }) + } + + /// Look into the decoded PG WAL record and determine + /// roughly how large the buffer for serialized values needs to be. + fn estimate_buffer_size( + decoded: &DecodedWALRecord, + shard: &ShardIdentity, + pg_version: u32, + ) -> usize { + let mut estimate: usize = 0; + + for blk in decoded.blocks.iter() { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + + if !shard.is_key_local(&key) { + continue; + } + + if Self::block_is_image(decoded, blk, pg_version) { + // 4 bytes for the Value::Image discriminator + // 8 bytes for encoding the size of the buffer + // BLCKSZ for the raw image + estimate += (4 + 8 + BLCKSZ) as usize; + } else { + // 4 bytes for the Value::WalRecord discriminator + // 4 bytes for the NeonWalRecord::Postgres discriminator + // 1 bytes for NeonWalRecord::Postgres::will_init + // 8 bytes for encoding the size of the buffer + // length of the raw record + estimate += 8 + 1 + 8 + decoded.record.len(); + } + } + + estimate + } + + fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool { + blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version) + // do not materialize null pages because them most likely be soon replaced with real data + && blk.bimg_len != 0 + } + + /// Encode a list of values and metadata into a serialized batch + /// + /// This is used by the pageserver ingest code to conveniently generate + /// batches for metadata writes. + pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self { + // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by + // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`] + let buffer_size = batch.iter().map(|i| i.2).sum::(); + let mut buf = Vec::::with_capacity(buffer_size); + + let mut metadata: Vec = Vec::with_capacity(batch.len()); + let mut max_lsn: Lsn = Lsn(0); + let len = batch.len(); + for (key, lsn, val_ser_size, val) in batch { + let relative_off = buf.len() as u64; + + val.ser_into(&mut buf) + .expect("Writing into in-memory buffer is infallible"); + + metadata.push(ValueMeta::Serialized(SerializedValueMeta { + key, + lsn, + batch_offset: relative_off, + len: val_ser_size, + will_init: val.will_init(), + })); + max_lsn = std::cmp::max(max_lsn, lsn); + } + + // Assert that we didn't do any extra allocations while building buffer. + debug_assert!(buf.len() <= buffer_size); + + if cfg!(any(debug_assertions, test)) { + let batch = Self { + raw: buf, + metadata, + max_lsn, + len, + }; + + batch.validate_lsn_order(); + + return batch; + } + + Self { + raw: buf, + metadata, + max_lsn, + len, + } + } + + /// Add one value to the batch + /// + /// This is used by the pageserver ingest code to include metadata block + /// updates for a single key. + pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) { + let relative_off = self.raw.len() as u64; + value.ser_into(&mut self.raw).unwrap(); + + let val_ser_size = self.raw.len() - relative_off as usize; + self.metadata + .push(ValueMeta::Serialized(SerializedValueMeta { + key, + lsn, + batch_offset: relative_off, + len: val_ser_size, + will_init: value.will_init(), + })); + + self.max_lsn = std::cmp::max(self.max_lsn, lsn); + self.len += 1; + + if cfg!(any(debug_assertions, test)) { + self.validate_lsn_order(); + } + } + + /// Extend with the contents of another batch + /// + /// One batch is generated for each decoded PG WAL record. + /// They are then merged to accumulate reasonably sized writes. + pub fn extend(&mut self, mut other: SerializedValueBatch) { + let extend_batch_start_offset = self.raw.len() as u64; + + self.raw.extend(other.raw); + + // Shift the offsets in the batch we are extending with + other.metadata.iter_mut().for_each(|meta| match meta { + ValueMeta::Serialized(ser) => { + ser.batch_offset += extend_batch_start_offset; + if cfg!(debug_assertions) { + let value_end = ser.batch_offset + ser.len as u64; + assert!((value_end as usize) <= self.raw.len()); + } + } + ValueMeta::Observed(_) => {} + }); + self.metadata.extend(other.metadata); + + self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn); + + self.len += other.len; + + if cfg!(any(debug_assertions, test)) { + self.validate_lsn_order(); + } + } + + /// Add zero images for the (key, LSN) tuples specified + /// + /// PG versions below 16 do not zero out pages before extending + /// a relation and may leave gaps. Such gaps need to be identified + /// by the pageserver ingest logic and get patched up here. + /// + /// Note that this function does not validate that the gaps have been + /// identified correctly (it does not know relation sizes), so it's up + /// to the call-site to do it properly. + pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) { + // Implementation note: + // + // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements, + // but the metadata entries should be ordered properly (see + // [`SerializedValueBatch::metadata`]). + // + // Exploiting this observation we do: + // 1. Drain all the metadata entries into an ordered set. + // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never + // includes more than one update to the same block in the same WAL record. + // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer + // and add an index entry to the ordered metadata set. + // 3. Drain the ordered set back into a metadata vector + + let mut ordered_metas = self + .metadata + .drain(..) + .map(OrderedValueMeta) + .collect::>(); + for (keyspace, lsn) in gaps { + self.max_lsn = std::cmp::max(self.max_lsn, lsn); + + for gap_range in keyspace.ranges { + let mut key = gap_range.start; + while key != gap_range.end { + let relative_off = self.raw.len() as u64; + + // TODO(vlad): Can we be cheeky and write only one zero image, and + // make all index entries requiring a zero page point to it? + // Alternatively, we can change the index entry format to represent zero pages + // without writing them at all. + Value::Image(ZERO_PAGE.clone()) + .ser_into(&mut self.raw) + .unwrap(); + let val_ser_size = self.raw.len() - relative_off as usize; + + ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized( + SerializedValueMeta { + key: key.to_compact(), + lsn, + batch_offset: relative_off, + len: val_ser_size, + will_init: true, + }, + ))); + + self.len += 1; + + key = key.next(); + } + } + } + + self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect(); + + if cfg!(any(debug_assertions, test)) { + self.validate_lsn_order(); + } + } + + /// Checks if the batch is empty + /// + /// A batch is empty when it contains no serialized values. + /// Note that it may still contain observed values. + pub fn is_empty(&self) -> bool { + let empty = self.raw.is_empty(); + + if cfg!(debug_assertions) && empty { + assert!(self + .metadata + .iter() + .all(|meta| matches!(meta, ValueMeta::Observed(_)))); + } + + empty + } + + /// Returns the number of values serialized in the batch + pub fn len(&self) -> usize { + self.len + } + + /// Returns the size of the buffer wrapped by the batch + pub fn buffer_size(&self) -> usize { + self.raw.len() + } + + pub fn updates_key(&self, key: &Key) -> bool { + self.metadata.iter().any(|meta| match meta { + ValueMeta::Serialized(ser) => key.to_compact() == ser.key, + ValueMeta::Observed(_) => false, + }) + } + + pub fn validate_lsn_order(&self) { + use std::collections::HashMap; + + let mut last_seen_lsn_per_key: HashMap = HashMap::default(); + + for meta in self.metadata.iter() { + let lsn = meta.lsn(); + let key = meta.key(); + + if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) { + assert!( + lsn >= prev_lsn, + "Ordering violated by {}: {} < {}", + Key::from_compact(key), + lsn, + prev_lsn + ); + } + } + } +} + +#[cfg(all(test, feature = "testing"))] +mod tests { + use super::*; + + fn validate_batch( + batch: &SerializedValueBatch, + values: &[(CompactKey, Lsn, usize, Value)], + gaps: Option<&Vec<(KeySpace, Lsn)>>, + ) { + // Invariant 1: The metadata for a given entry in the batch + // is correct and can be used to deserialize back to the original value. + for (key, lsn, size, value) in values.iter() { + let meta = batch + .metadata + .iter() + .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn)) + .unwrap(); + let meta = match meta { + ValueMeta::Serialized(ser) => ser, + ValueMeta::Observed(_) => unreachable!(), + }; + + assert_eq!(meta.len, *size); + assert_eq!(meta.will_init, value.will_init()); + + let start = meta.batch_offset as usize; + let end = meta.batch_offset as usize + meta.len; + let value_from_batch = Value::des(&batch.raw[start..end]).unwrap(); + assert_eq!(&value_from_batch, value); + } + + let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum(); + let mut gap_pages_count: usize = 0; + + // Invariant 2: Zero pages were added for identified gaps and their metadata + // is correct. + if let Some(gaps) = gaps { + for (gap_keyspace, lsn) in gaps { + for gap_range in &gap_keyspace.ranges { + let mut gap_key = gap_range.start; + while gap_key != gap_range.end { + let meta = batch + .metadata + .iter() + .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn)) + .unwrap(); + let meta = match meta { + ValueMeta::Serialized(ser) => ser, + ValueMeta::Observed(_) => unreachable!(), + }; + + let zero_value = Value::Image(ZERO_PAGE.clone()); + let zero_value_size = zero_value.serialized_size().unwrap() as usize; + + assert_eq!(meta.len, zero_value_size); + assert_eq!(meta.will_init, zero_value.will_init()); + + let start = meta.batch_offset as usize; + let end = meta.batch_offset as usize + meta.len; + let value_from_batch = Value::des(&batch.raw[start..end]).unwrap(); + assert_eq!(value_from_batch, zero_value); + + gap_pages_count += 1; + expected_buffer_size += zero_value_size; + gap_key = gap_key.next(); + } + } + } + } + + // Invariant 3: The length of the batch is equal to the number + // of values inserted, plus the number of gap pages. This extends + // to the raw buffer size. + assert_eq!(batch.len(), values.len() + gap_pages_count); + assert_eq!(expected_buffer_size, batch.buffer_size()); + + // Invariant 4: Metadata entries for any given key are sorted in LSN order. + batch.validate_lsn_order(); + } + + #[test] + fn test_creation_from_values() { + const LSN: Lsn = Lsn(0x10); + let key = Key::from_hex("110000000033333333444444445500000001").unwrap(); + + let values = vec![ + ( + key.to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo")), + ), + ( + key.next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("bar")), + ), + ( + key.to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("baz")), + ), + ( + key.next().next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("taz")), + ), + ]; + + let values = values + .into_iter() + .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value)) + .collect::>(); + let batch = SerializedValueBatch::from_values(values.clone()); + + validate_batch(&batch, &values, None); + + assert!(!batch.is_empty()); + } + + #[test] + fn test_put() { + const LSN: Lsn = Lsn(0x10); + let key = Key::from_hex("110000000033333333444444445500000001").unwrap(); + + let values = vec![ + ( + key.to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo")), + ), + ( + key.next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("bar")), + ), + ]; + + let mut values = values + .into_iter() + .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value)) + .collect::>(); + let mut batch = SerializedValueBatch::from_values(values.clone()); + + validate_batch(&batch, &values, None); + + let value = ( + key.to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("baz")), + ); + let serialized_size = value.2.serialized_size().unwrap() as usize; + let value = (value.0, value.1, serialized_size, value.2); + values.push(value.clone()); + batch.put(value.0, value.3, value.1); + + validate_batch(&batch, &values, None); + + let value = ( + key.next().next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("taz")), + ); + let serialized_size = value.2.serialized_size().unwrap() as usize; + let value = (value.0, value.1, serialized_size, value.2); + values.push(value.clone()); + batch.put(value.0, value.3, value.1); + + validate_batch(&batch, &values, None); + } + + #[test] + fn test_extension() { + const LSN: Lsn = Lsn(0x10); + let key = Key::from_hex("110000000033333333444444445500000001").unwrap(); + + let values = vec![ + ( + key.to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo")), + ), + ( + key.next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("bar")), + ), + ( + key.next().next().to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("taz")), + ), + ]; + + let mut values = values + .into_iter() + .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value)) + .collect::>(); + let mut batch = SerializedValueBatch::from_values(values.clone()); + + let other_values = vec![ + ( + key.to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("foo")), + ), + ( + key.next().to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("bar")), + ), + ( + key.next().next().to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("taz")), + ), + ]; + + let other_values = other_values + .into_iter() + .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value)) + .collect::>(); + let other_batch = SerializedValueBatch::from_values(other_values.clone()); + + values.extend(other_values); + batch.extend(other_batch); + + validate_batch(&batch, &values, None); + } + + #[test] + fn test_gap_zeroing() { + const LSN: Lsn = Lsn(0x10); + let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap(); + + let rel_bar_base_key = { + let mut key = rel_foo_base_key; + key.field4 += 1; + key + }; + + let values = vec![ + ( + rel_foo_base_key.to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo1")), + ), + ( + rel_foo_base_key.add(1).to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo2")), + ), + ( + rel_foo_base_key.add(5).to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("foo3")), + ), + ( + rel_foo_base_key.add(1).to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("foo4")), + ), + ( + rel_foo_base_key.add(10).to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("foo5")), + ), + ( + rel_foo_base_key.add(11).to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("foo6")), + ), + ( + rel_foo_base_key.add(12).to_compact(), + Lsn(LSN.0 + 0x10), + Value::WalRecord(NeonWalRecord::wal_append("foo7")), + ), + ( + rel_bar_base_key.to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("bar1")), + ), + ( + rel_bar_base_key.add(4).to_compact(), + LSN, + Value::WalRecord(NeonWalRecord::wal_append("bar2")), + ), + ]; + + let values = values + .into_iter() + .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value)) + .collect::>(); + + let mut batch = SerializedValueBatch::from_values(values.clone()); + + let gaps = vec![ + ( + KeySpace { + ranges: vec![ + rel_foo_base_key.add(2)..rel_foo_base_key.add(5), + rel_bar_base_key.add(1)..rel_bar_base_key.add(4), + ], + }, + LSN, + ), + ( + KeySpace { + ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)], + }, + Lsn(LSN.0 + 0x10), + ), + ]; + + batch.zero_gaps(gaps.clone()); + validate_batch(&batch, &values, Some(&gaps)); + } +} diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 0a1ad9cd6b..f6b2a8e031 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -9,7 +9,6 @@ use pageserver::{ l0_flush::{L0FlushConfig, L0FlushGlobalState}, page_cache, task_mgr::TaskKind, - tenant::storage_layer::inmemory_layer::SerializedBatch, tenant::storage_layer::InMemoryLayer, virtual_file, }; @@ -18,6 +17,7 @@ use utils::{ bin_ser::BeSer, id::{TenantId, TimelineId}, }; +use wal_decoder::serialized_batch::SerializedValueBatch; // A very cheap hash for generating non-sequential keys. fn murmurhash32(mut h: u32) -> u32 { @@ -102,13 +102,13 @@ async fn ingest( batch.push((key.to_compact(), lsn, data_ser_size, data.clone())); if batch.len() >= BATCH_SIZE { let this_batch = std::mem::take(&mut batch); - let serialized = SerializedBatch::from_values(this_batch).unwrap(); + let serialized = SerializedValueBatch::from_values(this_batch); layer.put_batch(serialized, &ctx).await?; } } if !batch.is_empty() { let this_batch = std::mem::take(&mut batch); - let serialized = SerializedBatch::from_values(this_batch).unwrap(); + let serialized = SerializedValueBatch::from_values(this_batch); layer.put_batch(serialized, &ctx).await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index dc2dc08b53..7b106569a4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -24,6 +24,7 @@ use pageserver_api::key::{ use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::shard::ShardIdentity; use pageserver_api::value::Value; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -38,6 +39,7 @@ use tracing::{debug, trace, warn}; use utils::bin_ser::DeserializeError; use utils::pausable_failpoint; use utils::{bin_ser::BeSer, lsn::Lsn}; +use wal_decoder::serialized_batch::SerializedValueBatch; /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. pub const MAX_AUX_FILE_DELTAS: usize = 1024; @@ -170,12 +172,11 @@ impl Timeline { tline: self, pending_lsns: Vec::new(), pending_metadata_pages: HashMap::new(), - pending_data_pages: Vec::new(), - pending_zero_data_pages: Default::default(), + pending_data_batch: None, pending_deletions: Vec::new(), pending_nblocks: 0, pending_directory_entries: Vec::new(), - pending_bytes: 0, + pending_metadata_bytes: 0, lsn, } } @@ -1025,21 +1026,14 @@ pub struct DatadirModification<'a> { /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for /// which keys are stored here. - pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>, - - // Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However, - // if we encounter a write from postgres in the same wal record, we will drop this entry. - // - // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed - // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn - pending_zero_data_pages: HashSet, + pending_data_batch: Option, /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, - /// An **approximation** of how large our EphemeralFile write will be when committed. - pending_bytes: usize, + /// An **approximation** of how many metadata bytes will be written to the EphemeralFile. + pending_metadata_bytes: usize, } impl<'a> DatadirModification<'a> { @@ -1054,11 +1048,17 @@ impl<'a> DatadirModification<'a> { } pub(crate) fn approx_pending_bytes(&self) -> usize { - self.pending_bytes + self.pending_data_batch + .as_ref() + .map_or(0, |b| b.buffer_size()) + + self.pending_metadata_bytes } - pub(crate) fn has_dirty_data_pages(&self) -> bool { - (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty()) + pub(crate) fn has_dirty_data(&self) -> bool { + !self + .pending_data_batch + .as_ref() + .map_or(true, |b| b.is_empty()) } /// Set the current lsn @@ -1070,9 +1070,6 @@ impl<'a> DatadirModification<'a> { self.lsn ); - // If we are advancing LSN, then state from previous wal record should have been flushed. - assert!(self.pending_zero_data_pages.is_empty()); - if lsn > self.lsn { self.pending_lsns.push(self.lsn); self.lsn = lsn; @@ -1147,6 +1144,107 @@ impl<'a> DatadirModification<'a> { Ok(()) } + /// Creates a relation if it is not already present. + /// Returns the current size of the relation + pub(crate) async fn create_relation_if_required( + &mut self, + rel: RelTag, + ctx: &RequestContext, + ) -> Result { + // Get current size and put rel creation if rel doesn't exist + // + // NOTE: we check the cache first even though get_rel_exists and get_rel_size would + // check the cache too. This is because eagerly checking the cache results in + // less work overall and 10% better performance. It's more work on cache miss + // but cache miss is rare. + if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) { + Ok(nblocks) + } else if !self + .tline + .get_rel_exists(rel, Version::Modified(self), ctx) + .await? + { + // create it with 0 size initially, the logic below will extend it + self.put_rel_creation(rel, 0, ctx) + .await + .context("Relation Error")?; + Ok(0) + } else { + self.tline + .get_rel_size(rel, Version::Modified(self), ctx) + .await + } + } + + /// Given a block number for a relation (which represents a newly written block), + /// the previous block count of the relation, and the shard info, find the gaps + /// that were created by the newly written block if any. + fn find_gaps( + rel: RelTag, + blkno: u32, + previous_nblocks: u32, + shard: &ShardIdentity, + ) -> Option { + let mut key = rel_block_to_key(rel, blkno); + let mut gap_accum = None; + + for gap_blkno in previous_nblocks..blkno { + key.field6 = gap_blkno; + + if shard.get_shard_number(&key) != shard.number { + continue; + } + + gap_accum + .get_or_insert_with(KeySpaceAccum::new) + .add_key(key); + } + + gap_accum.map(|accum| accum.to_keyspace()) + } + + pub async fn ingest_batch( + &mut self, + mut batch: SerializedValueBatch, + // TODO(vlad): remove this argument and replace the shard check with is_key_local + shard: &ShardIdentity, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut gaps_at_lsns = Vec::default(); + + for meta in batch.metadata.iter() { + let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?; + let new_nblocks = blkno + 1; + + let old_nblocks = self.create_relation_if_required(rel, ctx).await?; + if new_nblocks > old_nblocks { + self.put_rel_extend(rel, new_nblocks, ctx).await?; + } + + if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) { + gaps_at_lsns.push((gaps, meta.lsn())); + } + } + + if !gaps_at_lsns.is_empty() { + batch.zero_gaps(gaps_at_lsns); + } + + match self.pending_data_batch.as_mut() { + Some(pending_batch) => { + pending_batch.extend(batch); + } + None if !batch.is_empty() => { + self.pending_data_batch = Some(batch); + } + None => { + // Nothing to initialize the batch with + } + } + + Ok(()) + } + /// Put a new page version that can be constructed from a WAL record /// /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the @@ -1229,8 +1327,13 @@ impl<'a> DatadirModification<'a> { self.lsn ); } - self.pending_zero_data_pages.insert(key.to_compact()); - self.pending_bytes += ZERO_PAGE.len(); + + let batch = self + .pending_data_batch + .get_or_insert_with(SerializedValueBatch::default); + + batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn); + Ok(()) } @@ -1248,17 +1351,14 @@ impl<'a> DatadirModification<'a> { self.lsn ); } - self.pending_zero_data_pages.insert(key.to_compact()); - self.pending_bytes += ZERO_PAGE.len(); - Ok(()) - } - /// Call this at the end of each WAL record. - pub(crate) fn on_record_end(&mut self) { - let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages); - for key in pending_zero_data_pages { - self.put_data(key, Value::Image(ZERO_PAGE.clone())); - } + let batch = self + .pending_data_batch + .get_or_insert_with(SerializedValueBatch::default); + + batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn); + + Ok(()) } /// Store a relmapper file (pg_filenode.map) in the repository @@ -1750,12 +1850,17 @@ impl<'a> DatadirModification<'a> { let mut writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let pending_data_pages = std::mem::take(&mut self.pending_data_pages); + if let Some(batch) = self.pending_data_batch.take() { + tracing::debug!( + "Flushing batch with max_lsn={}. Last record LSN is {}", + batch.max_lsn, + self.tline.get_last_record_lsn() + ); - // This bails out on first error without modifying pending_updates. - // That's Ok, cf this function's doc comment. - writer.put_batch(pending_data_pages, ctx).await?; - self.pending_bytes = 0; + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put_batch(batch, ctx).await?; + } if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1775,9 +1880,6 @@ impl<'a> DatadirModification<'a> { /// All the modifications in this atomic update are stamped by the specified LSN. /// pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { - // Commit should never be called mid-wal-record - assert!(self.pending_zero_data_pages.is_empty()); - let mut writer = self.tline.writer().await; let pending_nblocks = self.pending_nblocks; @@ -1785,21 +1887,49 @@ impl<'a> DatadirModification<'a> { // Ordering: the items in this batch do not need to be in any global order, but values for // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on - // this to do efficient updates to its index. - let mut write_batch = std::mem::take(&mut self.pending_data_pages); + // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for + // more details. - write_batch.extend( - self.pending_metadata_pages + let metadata_batch = { + let pending_meta = self + .pending_metadata_pages .drain() .flat_map(|(key, values)| { values .into_iter() .map(move |(lsn, value_size, value)| (key, lsn, value_size, value)) - }), - ); + }) + .collect::>(); - if !write_batch.is_empty() { - writer.put_batch(write_batch, ctx).await?; + if pending_meta.is_empty() { + None + } else { + Some(SerializedValueBatch::from_values(pending_meta)) + } + }; + + let data_batch = self.pending_data_batch.take(); + + let maybe_batch = match (data_batch, metadata_batch) { + (Some(mut data), Some(metadata)) => { + data.extend(metadata); + Some(data) + } + (Some(data), None) => Some(data), + (None, Some(metadata)) => Some(metadata), + (None, None) => None, + }; + + if let Some(batch) = maybe_batch { + tracing::debug!( + "Flushing batch with max_lsn={}. Last record LSN is {}", + batch.max_lsn, + self.tline.get_last_record_lsn() + ); + + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put_batch(batch, ctx).await?; } if !self.pending_deletions.is_empty() { @@ -1809,6 +1939,9 @@ impl<'a> DatadirModification<'a> { self.pending_lsns.push(self.lsn); for pending_lsn in self.pending_lsns.drain(..) { + // TODO(vlad): pretty sure the comment below is not valid anymore + // and we can call finish write with the latest LSN + // // Ideally, we should be able to call writer.finish_write() only once // with the highest LSN. However, the last_record_lsn variable in the // timeline keeps track of the latest LSN and the immediate previous LSN @@ -1824,14 +1957,14 @@ impl<'a> DatadirModification<'a> { writer.update_directory_entries_count(kind, count as u64); } - self.pending_bytes = 0; + self.pending_metadata_bytes = 0; Ok(()) } pub(crate) fn len(&self) -> usize { self.pending_metadata_pages.len() - + self.pending_data_pages.len() + + self.pending_data_batch.as_ref().map_or(0, |b| b.len()) + self.pending_deletions.len() } @@ -1873,11 +2006,10 @@ impl<'a> DatadirModification<'a> { // modifications before ingesting DB create operations, which are the only kind that reads // data pages during ingest. if cfg!(debug_assertions) { - for (dirty_key, _, _, _) in &self.pending_data_pages { - debug_assert!(&key.to_compact() != dirty_key); - } - - debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact())) + assert!(!self + .pending_data_batch + .as_ref() + .map_or(false, |b| b.updates_key(&key))); } } @@ -1895,18 +2027,10 @@ impl<'a> DatadirModification<'a> { } fn put_data(&mut self, key: CompactKey, val: Value) { - let val_serialized_size = val.serialized_size().unwrap() as usize; - - // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This - // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend), - // and the subsequent postgres-originating write - if self.pending_zero_data_pages.remove(&key) { - self.pending_bytes -= ZERO_PAGE.len(); - } - - self.pending_bytes += val_serialized_size; - self.pending_data_pages - .push((key, self.lsn, val_serialized_size, val)) + let batch = self + .pending_data_batch + .get_or_insert_with(SerializedValueBatch::default); + batch.put(key, val, self.lsn); } fn put_metadata(&mut self, key: CompactKey, val: Value) { @@ -1914,10 +2038,10 @@ impl<'a> DatadirModification<'a> { // Replace the previous value if it exists at the same lsn if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() { if *last_lsn == self.lsn { - // Update the pending_bytes contribution from this entry, and update the serialized size in place - self.pending_bytes -= *last_value_ser_size; + // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place + self.pending_metadata_bytes -= *last_value_ser_size; *last_value_ser_size = val.serialized_size().unwrap() as usize; - self.pending_bytes += *last_value_ser_size; + self.pending_metadata_bytes += *last_value_ser_size; // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much // have been generated by synthesized zero page writes prior to the first real write to a page. @@ -1927,8 +2051,12 @@ impl<'a> DatadirModification<'a> { } let val_serialized_size = val.serialized_size().unwrap() as usize; - self.pending_bytes += val_serialized_size; + self.pending_metadata_bytes += val_serialized_size; values.push((self.lsn, val_serialized_size, val)); + + if key == CHECKPOINT_KEY.to_compact() { + tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}"); + } } fn delete(&mut self, key_range: Range) { @@ -2037,7 +2165,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); #[cfg(test)] mod tests { use hex_literal::hex; - use utils::id::TimelineId; + use pageserver_api::{models::ShardParameters, shard::ShardStripeSize}; + use utils::{ + id::TimelineId, + shard::{ShardCount, ShardNumber}, + }; use super::*; @@ -2091,6 +2223,93 @@ mod tests { Ok(()) } + #[test] + fn gap_finding() { + let rel = RelTag { + spcnode: 1663, + dbnode: 208101, + relnode: 2620, + forknum: 0, + }; + let base_blkno = 1; + + let base_key = rel_block_to_key(rel, base_blkno); + let before_base_key = rel_block_to_key(rel, base_blkno - 1); + + let shard = ShardIdentity::unsharded(); + + let mut previous_nblocks = 0; + for i in 0..10 { + let crnt_blkno = base_blkno + i; + let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard); + + previous_nblocks = crnt_blkno + 1; + + if i == 0 { + // The first block we write is 1, so we should find the gap. + assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key)); + } else { + assert!(gaps.is_none()); + } + } + + // This is an update to an already existing block. No gaps here. + let update_blkno = 5; + let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard); + assert!(gaps.is_none()); + + // This is an update past the current end block. + let after_gap_blkno = 20; + let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard); + + let gap_start_key = rel_block_to_key(rel, previous_nblocks); + let after_gap_key = rel_block_to_key(rel, after_gap_blkno); + assert_eq!( + gaps.unwrap(), + KeySpace::single(gap_start_key..after_gap_key) + ); + } + + #[test] + fn sharded_gap_finding() { + let rel = RelTag { + spcnode: 1663, + dbnode: 208101, + relnode: 2620, + forknum: 0, + }; + + let first_blkno = 6; + + // This shard will get the even blocks + let shard = ShardIdentity::from_params( + ShardNumber(0), + &ShardParameters { + count: ShardCount(2), + stripe_size: ShardStripeSize(1), + }, + ); + + // Only keys belonging to this shard are considered as gaps. + let mut previous_nblocks = 0; + let gaps = + DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap(); + assert!(!gaps.ranges.is_empty()); + for gap_range in gaps.ranges { + let mut k = gap_range.start; + while k != gap_range.end { + assert_eq!(shard.get_shard_number(&k), shard.number); + k = k.next(); + } + } + + previous_nblocks = first_blkno; + + let update_blkno = 2; + let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard); + assert!(gaps.is_none()); + } + /* fn assert_current_logical_size(timeline: &DatadirTimeline, lsn: Lsn) { let incremental = timeline.get_current_logical_size(); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index df448a0963..2ce26ed2eb 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -12,7 +12,7 @@ use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; use pageserver_api::key::Key; @@ -25,6 +25,7 @@ use std::sync::{Arc, OnceLock}; use std::time::Instant; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; +use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta}; // avoid binding to Write (conflicts with std::io::Write) // while being able to use std::fmt::Write's methods use crate::metrics::TIMELINE_EPHEMERAL_BYTES; @@ -452,6 +453,7 @@ impl InMemoryLayer { len, will_init, } = index_entry.unpack(); + reads.entry(key).or_default().push(ValueRead { entry_lsn: *entry_lsn, read: vectored_dio_read::LogicalRead::new( @@ -513,68 +515,6 @@ impl InMemoryLayer { } } -/// Offset of a particular Value within a serialized batch. -struct SerializedBatchOffset { - key: CompactKey, - lsn: Lsn, - // TODO: separate type when we start serde-serializing this value, to avoid coupling - // in-memory representation to serialization format. - index_entry: IndexEntry, -} - -pub struct SerializedBatch { - /// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`]. - pub(crate) raw: Vec, - - /// Index of values in [`Self::raw`], using offsets relative to the start of the buffer. - offsets: Vec, - - /// The highest LSN of any value in the batch - pub(crate) max_lsn: Lsn, -} - -impl SerializedBatch { - pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> anyhow::Result { - // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by - // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`] - let buffer_size = batch.iter().map(|i| i.2).sum::(); - let mut cursor = std::io::Cursor::new(Vec::::with_capacity(buffer_size)); - - let mut offsets: Vec = Vec::with_capacity(batch.len()); - let mut max_lsn: Lsn = Lsn(0); - for (key, lsn, val_ser_size, val) in batch { - let relative_off = cursor.position(); - - val.ser_into(&mut cursor) - .expect("Writing into in-memory buffer is infallible"); - - offsets.push(SerializedBatchOffset { - key, - lsn, - index_entry: IndexEntry::new(IndexEntryNewArgs { - base_offset: 0, - batch_offset: relative_off, - len: val_ser_size, - will_init: val.will_init(), - }) - .context("higher-level code ensures that values are within supported ranges")?, - }); - max_lsn = std::cmp::max(max_lsn, lsn); - } - - let buffer = cursor.into_inner(); - - // Assert that we didn't do any extra allocations while building buffer. - debug_assert!(buffer.len() <= buffer_size); - - Ok(Self { - raw: buffer, - offsets, - max_lsn, - }) - } -} - fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result { write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0) } @@ -642,7 +582,7 @@ impl InMemoryLayer { /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors. pub async fn put_batch( &self, - serialized_batch: SerializedBatch, + serialized_batch: SerializedValueBatch, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut inner = self.inner.write().await; @@ -650,27 +590,13 @@ impl InMemoryLayer { let base_offset = inner.file.len(); - let SerializedBatch { + let SerializedValueBatch { raw, - mut offsets, + metadata, max_lsn: _, + len: _, } = serialized_batch; - // Add the base_offset to the batch's index entries which are relative to the batch start. - for offset in &mut offsets { - let IndexEntryUnpacked { - will_init, - len, - pos, - } = offset.index_entry.unpack(); - offset.index_entry = IndexEntry::new(IndexEntryNewArgs { - base_offset, - batch_offset: pos, - len: len.into_usize(), - will_init, - })?; - } - // Write the batch to the file inner.file.write_raw(&raw, ctx).await?; let new_size = inner.file.len(); @@ -683,12 +609,28 @@ impl InMemoryLayer { assert_eq!(new_size, expected_new_len); // Update the index with the new entries - for SerializedBatchOffset { - key, - lsn, - index_entry, - } in offsets - { + for meta in metadata { + let SerializedValueMeta { + key, + lsn, + batch_offset, + len, + will_init, + } = match meta { + ValueMeta::Serialized(ser) => ser, + ValueMeta::Observed(_) => { + continue; + } + }; + + // Add the base_offset to the batch's index entries which are relative to the batch start. + let index_entry = IndexEntry::new(IndexEntryNewArgs { + base_offset, + batch_offset, + len, + will_init, + })?; + let vec_map = inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; if old.is_some() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 12919866a3..ee823beca8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,8 +24,8 @@ use offload::OffloadError; use once_cell::sync::Lazy; use pageserver_api::{ key::{ - CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, - NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE, + KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE, + NON_INHERITED_SPARSE_RANGE, }, keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning}, models::{ @@ -49,6 +49,7 @@ use utils::{ fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, }; +use wal_decoder::serialized_batch::SerializedValueBatch; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -131,7 +132,6 @@ use crate::task_mgr::TaskKind; use crate::tenant::gc_result::GcResult; use crate::ZERO_PAGE; use pageserver_api::key::Key; -use pageserver_api::value::Value; use self::delete::DeleteTimelineFlow; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -141,9 +141,7 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{ - config::TenantConf, - storage_layer::{inmemory_layer, LayerVisibilityHint}, - upload_queue::NotInitialized, + config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized, MaybeOffloaded, }; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; @@ -157,6 +155,9 @@ use super::{ GcError, }; +#[cfg(test)] +use pageserver_api::value::Value; + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum FlushLoopState { NotStarted, @@ -5736,23 +5737,22 @@ impl<'a> TimelineWriter<'a> { /// Put a batch of keys at the specified Lsns. pub(crate) async fn put_batch( &mut self, - batch: Vec<(CompactKey, Lsn, usize, Value)>, + batch: SerializedValueBatch, ctx: &RequestContext, ) -> anyhow::Result<()> { if batch.is_empty() { return Ok(()); } - let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch)?; - let batch_max_lsn = serialized_batch.max_lsn; - let buf_size: u64 = serialized_batch.raw.len() as u64; + let batch_max_lsn = batch.max_lsn; + let buf_size: u64 = batch.buffer_size() as u64; let action = self.get_open_layer_action(batch_max_lsn, buf_size); let layer = self .handle_open_layer_action(batch_max_lsn, action, ctx) .await?; - let res = layer.put_batch(serialized_batch, ctx).await; + let res = layer.put_batch(batch, ctx).await; if res.is_ok() { // Update the current size only when the entire write was ok. @@ -5787,11 +5787,14 @@ impl<'a> TimelineWriter<'a> { ); } let val_ser_size = value.serialized_size().unwrap() as usize; - self.put_batch( - vec![(key.to_compact(), lsn, val_ser_size, value.clone())], - ctx, - ) - .await + let batch = SerializedValueBatch::from_values(vec![( + key.to_compact(), + lsn, + val_ser_size, + value.clone(), + )]); + + self.put_batch(batch, ctx).await } pub(crate) async fn delete_batch( diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index eb19fb691f..34bf959058 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection( Ok(()) } - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - if !lsn.is_aligned() { + if !record_end_lsn.is_aligned() { return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } @@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection( let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, modification.tline.get_shard_identity(), - lsn, + record_end_lsn, modification.tline.pg_version, )?; @@ -366,9 +366,11 @@ pub(super) async fn handle_walreceiver_connection( let ingested = walingest .ingest_record(interpreted, &mut modification, &ctx) .await - .with_context(|| format!("could not ingest record at {lsn}"))?; + .with_context(|| { + format!("could not ingest record at {record_end_lsn}") + })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {lsn}"); + tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}"); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -378,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection( // to timeout the tests. fail_point!("walreceiver-after-ingest"); - last_rec_lsn = lsn; + last_rec_lsn = record_end_lsn; // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 84353970b7..c3ccd8a2e4 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,14 +28,13 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; -use pageserver_api::key::Key; use pageserver_api::shard::ShardIdentity; use postgres_ffi::fsm_logical_to_physical; use postgres_ffi::walrecord::*; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; use wal_decoder::models::*; -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use bytes::{Buf, Bytes}; use tracing::*; use utils::failpoint_support; @@ -51,7 +50,6 @@ use crate::ZERO_PAGE; use pageserver_api::key::rel_block_to_key; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; -use pageserver_api::value::Value; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::TransactionId; @@ -156,12 +154,12 @@ impl WalIngest { WAL_INGEST.records_received.inc(); let prev_len = modification.len(); - modification.set_lsn(interpreted.lsn)?; + modification.set_lsn(interpreted.end_lsn)?; if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they // rely on reading data pages back from the Timeline. - assert!(!modification.has_dirty_data_pages()); + assert!(!modification.has_dirty_data()); } assert!(!self.checkpoint_modified); @@ -275,28 +273,9 @@ impl WalIngest { } } - // Iterate through all the key value pairs provided in the interpreted block - // and update the modification currently in-flight to include them. - for (compact_key, maybe_value) in interpreted.blocks.into_iter() { - let (rel, blk) = Key::from_compact(compact_key).to_rel_block()?; - match maybe_value { - Some(Value::Image(img)) => { - self.put_rel_page_image(modification, rel, blk, img, ctx) - .await?; - } - Some(Value::WalRecord(rec)) => { - self.put_rel_wal_record(modification, rel, blk, rec, ctx) - .await?; - } - None => { - // Shard 0 tracks relation sizes. We will observe - // its blkno in case it implicitly extends a relation. - assert!(self.shard.is_shard_zero()); - self.observe_decoded_block(modification, rel, blk, ctx) - .await?; - } - } - } + modification + .ingest_batch(interpreted.batch, &self.shard, ctx) + .await?; // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { @@ -310,8 +289,6 @@ impl WalIngest { // until commit() is called to flush the data into the repository and update // the latest LSN. - modification.on_record_end(); - Ok(modification.len() > prev_len) } @@ -334,17 +311,6 @@ impl WalIngest { Ok((epoch as u64) << 32 | xid as u64) } - /// Do not store this block, but observe it for the purposes of updating our relation size state. - async fn observe_decoded_block( - &mut self, - modification: &mut DatadirModification<'_>, - rel: RelTag, - blkno: BlockNumber, - ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { - self.handle_rel_extend(modification, rel, blkno, ctx).await - } - async fn ingest_clear_vm_bits( &mut self, clear_vm_bits: ClearVmBits, @@ -1248,6 +1214,7 @@ impl WalIngest { Ok(()) } + #[cfg(test)] async fn put_rel_page_image( &mut self, modification: &mut DatadirModification<'_>, @@ -1297,36 +1264,7 @@ impl WalIngest { let new_nblocks = blknum + 1; // Check if the relation exists. We implicitly create relations on first // record. - // TODO: would be nice if to be more explicit about it - - // Get current size and put rel creation if rel doesn't exist - // - // NOTE: we check the cache first even though get_rel_exists and get_rel_size would - // check the cache too. This is because eagerly checking the cache results in - // less work overall and 10% better performance. It's more work on cache miss - // but cache miss is rare. - let old_nblocks = if let Some(nblocks) = modification - .tline - .get_cached_rel_size(&rel, modification.get_lsn()) - { - nblocks - } else if !modification - .tline - .get_rel_exists(rel, Version::Modified(modification), ctx) - .await? - { - // create it with 0 size initially, the logic below will extend it - modification - .put_rel_creation(rel, 0, ctx) - .await - .context("Relation Error")?; - 0 - } else { - modification - .tline - .get_rel_size(rel, Version::Modified(modification), ctx) - .await? - }; + let old_nblocks = modification.create_relation_if_required(rel, ctx).await?; if new_nblocks > old_nblocks { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); @@ -1553,25 +1491,21 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1713,7 +1647,6 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_eq!( tline @@ -1739,7 +1672,6 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_eq!( tline