Files
neon/libs/wal_decoder/src/models.rs
Vlad Lazar 4dfa0c221b pageserver: ingest pre-serialized batches of values (#9579)
## Problem

https://github.com/neondatabase/neon/pull/9524 split the decoding and
interpretation step from ingestion.
The output of the first phase is a `wal_decoder::models::InterpretedWalRecord`. 
Before this patch set that struct contained a list of `Value` instances.

We wish to lift the decoding and interpretation step to the safekeeper,
but it would be nice if the safekeeper gave us a batch containing the raw data instead of actual values.

## Summary of changes

Main goal here is to make `InterpretedWalRecord` hold a raw buffer which
contains pre-serialized Values.
For this we do:
1. Add a `SerializedValueBatch` type. This is `inmemory_layer::SerializedBatch` with some 
extra functionality for extension, observing values for shard 0 and tests.
2. Replace `inmemory_layer::SerializedBatch` with `SerializedValueBatch`
3. Make `DatadirModification` maintain a `SerializedValueBatch`.


### `DatadirModification` changes

`DatadirModification` now maintains a `SerializedValueBatch` and extends
it as new WAL records come in (to avoid flushing to disk on every
record).
In turn, this cascaded into a number of modifications to
`DatadirModification`:
1. Replace `pending_data_pages` and `pending_zero_data_pages` with `pending_data_batch`.
2. Removal of `pending_zero_data_pages` and its cousin `on_wal_record_end`
3. Rename `pending_bytes` to `pending_metadata_bytes` since this is what it tracks now.
4. Adapting of various utility methods like `len`, `approx_pending_bytes` and `has_dirty_data_pages`.

Removal of `pending_zero_data_pages` and the optimisation associated
with it ((1) and (2)) deserves more detail.

Previously all zero data pages went through `pending_zero_data_pages`.
We wrote zero data pages when filling gaps caused by relation extension
(case A) and when handling special wal records (case B). If it happened
that the same WAL record contained a non zero write for an entry in
`pending_zero_data_pages` we skipped the zero write.

Case A: We handle this differently now. When ingesting the
`SerialiezdValueBatch` associated with one PG WAL record, we identify the gaps and fill the
them in one go. Essentially, we move from a per key process (gaps were filled after each
new key), and replace it with a per record process. Hence, the optimisation is not
required anymore.

Case B: When the handling of a special record needs to zero out a key,
it just adds that to the current batch. I inspected the code, and I
don't think the optimisation kicked in here.
2024-11-06 14:10:32 +00:00

212 lines
5.3 KiB
Rust

//! This module houses types which represent decoded PG WAL records
//! ready for the pageserver to interpret. They are derived from the original
//! WAL records, so that each struct corresponds closely to one WAL record of
//! a specific kind. They contain the same information as the original WAL records,
//! but the values are already serialized in a [`SerializedValueBatch`], which
//! is the format that the pageserver is expecting them in.
//!
//! The ingestion code uses these structs to help with parsing the WAL records,
//! and it splits them into a stream of modifications to the key-value pairs that
//! are ultimately stored in delta layers. See also the split-out counterparts in
//! [`postgres_ffi::walrecord`].
//!
//! The pipeline which processes WAL records is not super obvious, so let's follow
//! the flow of an example XACT_COMMIT Postgres record:
//!
//! (Postgres XACT_COMMIT record)
//! |
//! |--> pageserver::walingest::WalIngest::decode_xact_record
//! |
//! |--> ([`XactRecord::Commit`])
//! |
//! |--> pageserver::walingest::WalIngest::ingest_xact_record
//! |
//! |--> (NeonWalRecord::ClogSetCommitted)
//! |
//! |--> write to KV store within the pageserver
use bytes::Bytes;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::walrecord::{
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
XlSmgrTruncate, XlXactParsedRecord,
};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
use crate::serialized_batch::SerializedValueBatch;
pub enum FlushUncommittedRecords {
Yes,
No,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
pub struct InterpretedWalRecord {
/// Optional metadata record - may cause writes to metadata keys
/// in the storage engine
pub metadata_record: Option<MetadataRecord>,
/// A pre-serialized batch along with the required metadata for ingestion
/// by the pageserver
pub batch: SerializedValueBatch,
/// Byte offset within WAL for the end of the original PG WAL record
pub end_lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
/// before ingesting this record. This is currently only used for legacy PG
/// database creations which read pages from a template database. Such WAL
/// records require reading data blocks while ingesting, hence the need to flush.
pub flush_uncommitted: FlushUncommittedRecords,
/// Transaction id of the original PG WAL record
pub xid: TransactionId,
}
/// The interpreted part of the Postgres WAL record which requires metadata
/// writes to the underlying storage engine.
pub enum MetadataRecord {
Heapam(HeapamRecord),
Neonrmgr(NeonrmgrRecord),
Smgr(SmgrRecord),
Dbase(DbaseRecord),
Clog(ClogRecord),
Xact(XactRecord),
MultiXact(MultiXactRecord),
Relmap(RelmapRecord),
Xlog(XlogRecord),
LogicalMessage(LogicalMessageRecord),
Standby(StandbyRecord),
Replorigin(ReploriginRecord),
}
pub enum HeapamRecord {
ClearVmBits(ClearVmBits),
}
pub struct ClearVmBits {
pub new_heap_blkno: Option<u32>,
pub old_heap_blkno: Option<u32>,
pub vm_rel: RelTag,
pub flags: u8,
}
pub enum NeonrmgrRecord {
ClearVmBits(ClearVmBits),
}
pub enum SmgrRecord {
Create(SmgrCreate),
Truncate(XlSmgrTruncate),
}
pub struct SmgrCreate {
pub rel: RelTag,
}
pub enum DbaseRecord {
Create(DbaseCreate),
Drop(DbaseDrop),
}
pub struct DbaseCreate {
pub db_id: Oid,
pub tablespace_id: Oid,
pub src_db_id: Oid,
pub src_tablespace_id: Oid,
}
pub struct DbaseDrop {
pub db_id: Oid,
pub tablespace_ids: Vec<Oid>,
}
pub enum ClogRecord {
ZeroPage(ClogZeroPage),
Truncate(ClogTruncate),
}
pub struct ClogZeroPage {
pub segno: u32,
pub rpageno: u32,
}
pub struct ClogTruncate {
pub pageno: u32,
pub oldest_xid: TransactionId,
pub oldest_xid_db: Oid,
}
pub enum XactRecord {
Commit(XactCommon),
Abort(XactCommon),
CommitPrepared(XactCommon),
AbortPrepared(XactCommon),
Prepare(XactPrepare),
}
pub struct XactCommon {
pub parsed: XlXactParsedRecord,
pub origin_id: u16,
// Fields below are only used for logging
pub xl_xid: TransactionId,
pub lsn: Lsn,
}
pub struct XactPrepare {
pub xl_xid: TransactionId,
pub data: Bytes,
}
pub enum MultiXactRecord {
ZeroPage(MultiXactZeroPage),
Create(XlMultiXactCreate),
Truncate(XlMultiXactTruncate),
}
pub struct MultiXactZeroPage {
pub slru_kind: SlruKind,
pub segno: u32,
pub rpageno: u32,
}
pub enum RelmapRecord {
Update(RelmapUpdate),
}
pub struct RelmapUpdate {
pub update: XlRelmapUpdate,
pub buf: Bytes,
}
pub enum XlogRecord {
Raw(RawXlogRecord),
}
pub struct RawXlogRecord {
pub info: u8,
pub lsn: Lsn,
pub buf: Bytes,
}
pub enum LogicalMessageRecord {
Put(PutLogicalMessage),
#[cfg(feature = "testing")]
Failpoint,
}
pub struct PutLogicalMessage {
pub path: String,
pub buf: Bytes,
}
pub enum StandbyRecord {
RunningXacts(StandbyRunningXacts),
}
pub struct StandbyRunningXacts {
pub oldest_running_xid: TransactionId,
}
pub enum ReploriginRecord {
Set(XlReploriginSet),
Drop(XlReploriginDrop),
}