Files
neon/libs/wal_decoder/src/models.rs
Vlad Lazar 9e0148de11 safekeeper: use protobuf for sending compressed records to pageserver (#9821)
## Problem

https://github.com/neondatabase/neon/pull/9746 lifted decoding and
interpretation of WAL to the safekeeper.
This reduced the ingested amount on the pageservers by around 10x for a
tenant with 8 shards, but doubled
the ingested amount for single sharded tenants.

Also, https://github.com/neondatabase/neon/pull/9746 uses bincode which
doesn't support schema evolution.
Technically the schema can be evolved, but it's very cumbersome.

## Summary of changes

This patch set addresses both problems by adding protobuf support for
the interpreted wal records and adding compression support. Compressed
protobuf reduced the ingested amount by 100x on the 32 shards
`test_sharded_ingest` case (compared to non-interpreted proto). For the
1 shard case the reduction is 5x.

Sister change to `rust-postgres` is
[here](https://github.com/neondatabase/rust-postgres/pull/33).

## Links

Related: https://github.com/neondatabase/neon/issues/9336
Epic: https://github.com/neondatabase/neon/issues/9329
2024-11-27 12:12:21 +00:00

275 lines
7.5 KiB
Rust

//! This module houses types which represent decoded PG WAL records
//! ready for the pageserver to interpret. They are derived from the original
//! WAL records, so that each struct corresponds closely to one WAL record of
//! a specific kind. They contain the same information as the original WAL records,
//! but the values are already serialized in a [`SerializedValueBatch`], which
//! is the format that the pageserver is expecting them in.
//!
//! The ingestion code uses these structs to help with parsing the WAL records,
//! and it splits them into a stream of modifications to the key-value pairs that
//! are ultimately stored in delta layers. See also the split-out counterparts in
//! [`postgres_ffi::walrecord`].
//!
//! The pipeline which processes WAL records is not super obvious, so let's follow
//! the flow of an example XACT_COMMIT Postgres record:
//!
//! (Postgres XACT_COMMIT record)
//! |
//! |--> pageserver::walingest::WalIngest::decode_xact_record
//! |
//! |--> ([`XactRecord::Commit`])
//! |
//! |--> pageserver::walingest::WalIngest::ingest_xact_record
//! |
//! |--> (NeonWalRecord::ClogSetCommitted)
//! |
//! |--> write to KV store within the pageserver
use bytes::Bytes;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::walrecord::{
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
XlSmgrTruncate, XlXactParsedRecord,
};
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
use crate::serialized_batch::SerializedValueBatch;
// Code generated by protobuf.
pub mod proto {
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
// we don't use these types for anything but broker data transmission,
// so it's ok to ignore this one.
#![allow(clippy::derive_partial_eq_without_eq)]
// The generated ValueMeta has a `len` method generate for its `len` field.
#![allow(clippy::len_without_is_empty)]
tonic::include_proto!("interpreted_wal");
}
#[derive(Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
}
/// A batch of interpreted WAL records
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecord {
/// Optional metadata record - may cause writes to metadata keys
/// in the storage engine
pub metadata_record: Option<MetadataRecord>,
/// A pre-serialized batch along with the required metadata for ingestion
/// by the pageserver
pub batch: SerializedValueBatch,
/// Byte offset within WAL for the start of the next PG WAL record.
/// Usually this is the end LSN of the current record, but in case of
/// XLOG SWITCH records it will be within the next segment.
pub next_record_lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
/// before ingesting this record. This is currently only used for legacy PG
/// database creations which read pages from a template database. Such WAL
/// records require reading data blocks while ingesting, hence the need to flush.
pub flush_uncommitted: FlushUncommittedRecords,
/// Transaction id of the original PG WAL record
pub xid: TransactionId,
}
impl InterpretedWalRecord {
/// Checks if the WAL record is empty
///
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
/// pageserver.
pub fn is_empty(&self) -> bool {
self.batch.is_empty()
&& self.metadata_record.is_none()
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
}
}
/// The interpreted part of the Postgres WAL record which requires metadata
/// writes to the underlying storage engine.
#[derive(Serialize, Deserialize)]
pub enum MetadataRecord {
Heapam(HeapamRecord),
Neonrmgr(NeonrmgrRecord),
Smgr(SmgrRecord),
Dbase(DbaseRecord),
Clog(ClogRecord),
Xact(XactRecord),
MultiXact(MultiXactRecord),
Relmap(RelmapRecord),
Xlog(XlogRecord),
LogicalMessage(LogicalMessageRecord),
Standby(StandbyRecord),
Replorigin(ReploriginRecord),
}
#[derive(Serialize, Deserialize)]
pub enum HeapamRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
pub struct ClearVmBits {
pub new_heap_blkno: Option<u32>,
pub old_heap_blkno: Option<u32>,
pub vm_rel: RelTag,
pub flags: u8,
}
#[derive(Serialize, Deserialize)]
pub enum NeonrmgrRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
pub enum SmgrRecord {
Create(SmgrCreate),
Truncate(XlSmgrTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct SmgrCreate {
pub rel: RelTag,
}
#[derive(Serialize, Deserialize)]
pub enum DbaseRecord {
Create(DbaseCreate),
Drop(DbaseDrop),
}
#[derive(Serialize, Deserialize)]
pub struct DbaseCreate {
pub db_id: Oid,
pub tablespace_id: Oid,
pub src_db_id: Oid,
pub src_tablespace_id: Oid,
}
#[derive(Serialize, Deserialize)]
pub struct DbaseDrop {
pub db_id: Oid,
pub tablespace_ids: Vec<Oid>,
}
#[derive(Serialize, Deserialize)]
pub enum ClogRecord {
ZeroPage(ClogZeroPage),
Truncate(ClogTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct ClogZeroPage {
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
pub struct ClogTruncate {
pub pageno: u32,
pub oldest_xid: TransactionId,
pub oldest_xid_db: Oid,
}
#[derive(Serialize, Deserialize)]
pub enum XactRecord {
Commit(XactCommon),
Abort(XactCommon),
CommitPrepared(XactCommon),
AbortPrepared(XactCommon),
Prepare(XactPrepare),
}
#[derive(Serialize, Deserialize)]
pub struct XactCommon {
pub parsed: XlXactParsedRecord,
pub origin_id: u16,
// Fields below are only used for logging
pub xl_xid: TransactionId,
pub lsn: Lsn,
}
#[derive(Serialize, Deserialize)]
pub struct XactPrepare {
pub xl_xid: TransactionId,
pub data: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum MultiXactRecord {
ZeroPage(MultiXactZeroPage),
Create(XlMultiXactCreate),
Truncate(XlMultiXactTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct MultiXactZeroPage {
pub slru_kind: SlruKind,
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
pub enum RelmapRecord {
Update(RelmapUpdate),
}
#[derive(Serialize, Deserialize)]
pub struct RelmapUpdate {
pub update: XlRelmapUpdate,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum XlogRecord {
Raw(RawXlogRecord),
}
#[derive(Serialize, Deserialize)]
pub struct RawXlogRecord {
pub info: u8,
pub lsn: Lsn,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum LogicalMessageRecord {
Put(PutLogicalMessage),
#[cfg(feature = "testing")]
Failpoint,
}
#[derive(Serialize, Deserialize)]
pub struct PutLogicalMessage {
pub path: String,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum StandbyRecord {
RunningXacts(StandbyRunningXacts),
}
#[derive(Serialize, Deserialize)]
pub struct StandbyRunningXacts {
pub oldest_running_xid: TransactionId,
}
#[derive(Serialize, Deserialize)]
pub enum ReploriginRecord {
Set(XlReploriginSet),
Drop(XlReploriginDrop),
}