From 07b974480c642bc79a63cfd0d456a607533fe966 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 29 Oct 2024 10:00:34 +0000 Subject: [PATCH] pageserver: move things around to prepare for decoding logic (#9504) ## Problem We wish to have high level WAL decoding logic in `wal_decoder::decoder` module. ## Summary of Changes For this we need the `Value` and `NeonWalRecord` types accessible there, so: 1. Move `Value` and `NeonWalRecord` to `pageserver::value` and `pageserver::record` respectively. 2. Get rid of `pageserver::repository` (follow up from (1)) 3. Move PG specific WAL record types to `postgres_ffi::walrecord`. In theory they could live in `wal_decoder`, but it would create a circular dependency between `wal_decoder` and `postgres_ffi`. Long term it makes sense for those types to be PG version specific, so that will work out nicely. 4. Move higher level WAL record types (to be ingested by pageserver) into `wal_decoder::models` Related: https://github.com/neondatabase/neon/issues/9335 Epic: https://github.com/neondatabase/neon/issues/9329 --- Cargo.lock | 16 + Cargo.toml | 2 + libs/pageserver_api/src/lib.rs | 2 + libs/pageserver_api/src/record.rs | 113 +++ .../pageserver_api/src/value.rs | 80 +- libs/postgres_ffi/Cargo.toml | 1 + libs/postgres_ffi/src/lib.rs | 1 + .../postgres_ffi}/src/walrecord.rs | 942 ++++++++---------- libs/wal_decoder/Cargo.toml | 18 + libs/wal_decoder/src/decoder.rs | 1 + libs/wal_decoder/src/lib.rs | 2 + libs/wal_decoder/src/models.rs | 167 ++++ pageserver/Cargo.toml | 3 +- pageserver/benches/bench_ingest.rs | 3 +- pageserver/benches/bench_layer_map.rs | 2 +- pageserver/benches/bench_walredo.rs | 3 +- pageserver/ctl/src/draw_timeline_dir.rs | 2 +- pageserver/ctl/src/layer_map_analyzer.rs | 2 +- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/deletion_queue.rs | 3 +- pageserver/src/http/routes.rs | 4 +- pageserver/src/import_datadir.rs | 3 +- pageserver/src/lib.rs | 2 - pageserver/src/pgdatadir_mapping.rs | 6 +- pageserver/src/tenant.rs | 32 +- pageserver/src/tenant/gc_result.rs | 57 ++ pageserver/src/tenant/layer_map.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 +- pageserver/src/tenant/storage_layer.rs | 4 +- .../storage_layer/batch_split_writer.rs | 3 +- .../src/tenant/storage_layer/delta_layer.rs | 14 +- .../tenant/storage_layer/filter_iterator.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 5 +- .../tenant/storage_layer/inmemory_layer.rs | 3 +- .../src/tenant/storage_layer/layer/tests.rs | 4 +- .../src/tenant/storage_layer/layer_desc.rs | 2 +- .../src/tenant/storage_layer/layer_name.rs | 2 +- .../tenant/storage_layer/merge_iterator.rs | 19 +- pageserver/src/tenant/timeline.rs | 21 +- pageserver/src/tenant/timeline/compaction.rs | 9 +- .../walreceiver/walreceiver_connection.rs | 2 +- pageserver/src/walingest.rs | 151 +-- pageserver/src/walredo.rs | 9 +- pageserver/src/walredo/apply_neon.rs | 4 +- pageserver/src/walredo/process.rs | 2 +- 45 files changed, 925 insertions(+), 806 deletions(-) create mode 100644 libs/pageserver_api/src/record.rs rename pageserver/src/repository.rs => libs/pageserver_api/src/value.rs (73%) rename {pageserver => libs/postgres_ffi}/src/walrecord.rs (88%) create mode 100644 libs/wal_decoder/Cargo.toml create mode 100644 libs/wal_decoder/src/decoder.rs create mode 100644 libs/wal_decoder/src/lib.rs create mode 100644 libs/wal_decoder/src/models.rs create mode 100644 pageserver/src/tenant/gc_result.rs diff --git a/Cargo.lock b/Cargo.lock index 610b607482..c5af247e8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3749,6 +3749,7 @@ dependencies = [ "tracing", "url", "utils", + "wal_decoder", "walkdir", "workspace_hack", ] @@ -4186,6 +4187,7 @@ dependencies = [ "regex", "serde", "thiserror", + "tracing", "utils", ] @@ -6954,6 +6956,20 @@ dependencies = [ "utils", ] +[[package]] +name = "wal_decoder" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "pageserver_api", + "postgres_ffi", + "serde", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "walkdir" version = "2.3.3" diff --git a/Cargo.toml b/Cargo.toml index 4c6a24ecde..7f9a766ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "libs/postgres_ffi/wal_craft", "libs/vm_monitor", "libs/walproposer", + "libs/wal_decoder", ] [workspace.package] @@ -238,6 +239,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } walproposer = { version = "0.1", path = "./libs/walproposer/" } +wal_decoder = { version = "0.1", path = "./libs/wal_decoder" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 532185a366..ff705e79cd 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -5,9 +5,11 @@ pub mod controller_api; pub mod key; pub mod keyspace; pub mod models; +pub mod record; pub mod reltag; pub mod shard; /// Public API types pub mod upcall_api; +pub mod value; pub mod config; diff --git a/libs/pageserver_api/src/record.rs b/libs/pageserver_api/src/record.rs new file mode 100644 index 0000000000..b80ed2f203 --- /dev/null +++ b/libs/pageserver_api/src/record.rs @@ -0,0 +1,113 @@ +//! This module defines the WAL record format used within the pageserver. + +use bytes::Bytes; +use postgres_ffi::walrecord::{describe_postgres_wal_record, MultiXactMember}; +use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId}; +use serde::{Deserialize, Serialize}; +use utils::bin_ser::DeserializeError; + +/// Each update to a page is represented by a NeonWalRecord. It can be a wrapper +/// around a PostgreSQL WAL record, or a custom neon-specific "record". +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum NeonWalRecord { + /// Native PostgreSQL WAL record + Postgres { will_init: bool, rec: Bytes }, + + /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear) + ClearVisibilityMapFlags { + new_heap_blkno: Option, + old_heap_blkno: Option, + flags: u8, + }, + /// Mark transaction IDs as committed on a CLOG page + ClogSetCommitted { + xids: Vec, + timestamp: TimestampTz, + }, + /// Mark transaction IDs as aborted on a CLOG page + ClogSetAborted { xids: Vec }, + /// Extend multixact offsets SLRU + MultixactOffsetCreate { + mid: MultiXactId, + moff: MultiXactOffset, + }, + /// Extend multixact members SLRU. + MultixactMembersCreate { + moff: MultiXactOffset, + members: Vec, + }, + /// Update the map of AUX files, either writing or dropping an entry + AuxFile { + file_path: String, + content: Option, + }, + + /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. + #[cfg(feature = "testing")] + Test { + /// Append a string to the image. + append: String, + /// Clear the image before appending. + clear: bool, + /// Treat this record as an init record. `clear` should be set to true if this field is set + /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and + /// its references in `timeline.rs`. + will_init: bool, + }, +} + +impl NeonWalRecord { + /// Does replaying this WAL record initialize the page from scratch, or does + /// it need to be applied over the previous image of the page? + pub fn will_init(&self) -> bool { + // If you change this function, you'll also need to change ValueBytes::will_init + match self { + NeonWalRecord::Postgres { will_init, rec: _ } => *will_init, + #[cfg(feature = "testing")] + NeonWalRecord::Test { will_init, .. } => *will_init, + // None of the special neon record types currently initialize the page + _ => false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_append(s: impl AsRef) -> Self { + Self::Test { + append: s.as_ref().to_string(), + clear: false, + will_init: false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_clear() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_init() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: true, + } + } +} + +/// Build a human-readable string to describe a WAL record +/// +/// For debugging purposes +pub fn describe_wal_record(rec: &NeonWalRecord) -> Result { + match rec { + NeonWalRecord::Postgres { will_init, rec } => Ok(format!( + "will_init: {}, {}", + will_init, + describe_postgres_wal_record(rec)? + )), + _ => Ok(format!("{:?}", rec)), + } +} diff --git a/pageserver/src/repository.rs b/libs/pageserver_api/src/value.rs similarity index 73% rename from pageserver/src/repository.rs rename to libs/pageserver_api/src/value.rs index e4ebafd927..1f8ed30a9a 100644 --- a/pageserver/src/repository.rs +++ b/libs/pageserver_api/src/value.rs @@ -1,13 +1,16 @@ -use crate::walrecord::NeonWalRecord; -use anyhow::Result; +//! This module defines the value type used by the storage engine. +//! +//! A [`Value`] represents either a completely new value for one Key ([`Value::Image`]), +//! or a "delta" of how to get from previous version of the value to the new one +//! ([`Value::WalRecord`]]) +//! +//! Note that the [`Value`] type is used for the permananent storage format, so any +//! changes to it must be backwards compatible. + +use crate::record::NeonWalRecord; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::ops::AddAssign; -use std::time::Duration; -pub use pageserver_api::key::{Key, KEY_SIZE}; - -/// A 'value' stored for a one Key. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum Value { /// An Image value contains a full copy of the value @@ -20,10 +23,12 @@ pub enum Value { } impl Value { + #[inline(always)] pub fn is_image(&self) -> bool { matches!(self, Value::Image(_)) } + #[inline(always)] pub fn will_init(&self) -> bool { match self { Value::Image(_) => true, @@ -33,17 +38,18 @@ impl Value { } #[derive(Debug, PartialEq)] -pub(crate) enum InvalidInput { +pub enum InvalidInput { TooShortValue, TooShortPostgresRecord, } /// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets /// use this type for querying if a slice looks some particular way. -pub(crate) struct ValueBytes; +pub struct ValueBytes; impl ValueBytes { - pub(crate) fn will_init(raw: &[u8]) -> Result { + #[inline(always)] + pub fn will_init(raw: &[u8]) -> Result { if raw.len() < 12 { return Err(InvalidInput::TooShortValue); } @@ -79,6 +85,7 @@ impl ValueBytes { mod test { use super::*; + use bytes::Bytes; use utils::bin_ser::BeSer; macro_rules! roundtrip { @@ -229,56 +236,3 @@ mod test { assert!(!ValueBytes::will_init(&expected).unwrap()); } } - -/// -/// Result of performing GC -/// -#[derive(Default, Serialize, Debug)] -pub struct GcResult { - pub layers_total: u64, - pub layers_needed_by_cutoff: u64, - pub layers_needed_by_pitr: u64, - pub layers_needed_by_branches: u64, - pub layers_needed_by_leases: u64, - pub layers_not_updated: u64, - pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. - - #[serde(serialize_with = "serialize_duration_as_millis")] - pub elapsed: Duration, - - /// The layers which were garbage collected. - /// - /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be - /// dropped in tests. - #[cfg(feature = "testing")] - #[serde(skip)] - pub(crate) doomed_layers: Vec, -} - -// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds -fn serialize_duration_as_millis(d: &Duration, serializer: S) -> Result -where - S: serde::Serializer, -{ - d.as_millis().serialize(serializer) -} - -impl AddAssign for GcResult { - fn add_assign(&mut self, other: Self) { - self.layers_total += other.layers_total; - self.layers_needed_by_pitr += other.layers_needed_by_pitr; - self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; - self.layers_needed_by_branches += other.layers_needed_by_branches; - self.layers_needed_by_leases += other.layers_needed_by_leases; - self.layers_not_updated += other.layers_not_updated; - self.layers_removed += other.layers_removed; - - self.elapsed += other.elapsed; - - #[cfg(feature = "testing")] - { - let mut other = other; - self.doomed_layers.append(&mut other.doomed_layers); - } - } -} diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index ef17833a48..e1f5443cbe 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -15,6 +15,7 @@ memoffset.workspace = true thiserror.workspace = true serde.workspace = true utils.workspace = true +tracing.workspace = true [dev-dependencies] env_logger.workspace = true diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 0d46ed6aac..6b219488ac 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -217,6 +217,7 @@ macro_rules! enum_pgversion { pub mod pg_constants; pub mod relfile_utils; +pub mod walrecord; // Export some widely used datatypes that are unlikely to change across Postgres versions pub use v14::bindings::RepOriginId; diff --git a/pageserver/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs similarity index 88% rename from pageserver/src/walrecord.rs rename to libs/postgres_ffi/src/walrecord.rs index dd199e2c55..dedbaef64d 100644 --- a/pageserver/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -1,107 +1,144 @@ +//! This module houses types used in decoding of PG WAL +//! records. //! -//! Functions for parsing WAL records. -//! +//! TODO: Generate separate types for each supported PG version -use anyhow::Result; +use crate::pg_constants; +use crate::XLogRecord; +use crate::{ + BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz, + TransactionId, +}; +use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD}; use bytes::{Buf, Bytes}; -use postgres_ffi::dispatch_pgversion; -use postgres_ffi::pg_constants; -use postgres_ffi::BLCKSZ; -use postgres_ffi::{BlockNumber, TimestampTz}; -use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; -use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; use serde::{Deserialize, Serialize}; -use tracing::*; -use utils::{bin_ser::DeserializeError, lsn::Lsn}; +use utils::bin_ser::DeserializeError; +use utils::lsn::Lsn; -/// Each update to a page is represented by a NeonWalRecord. It can be a wrapper -/// around a PostgreSQL WAL record, or a custom neon-specific "record". -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum NeonWalRecord { - /// Native PostgreSQL WAL record - Postgres { will_init: bool, rec: Bytes }, - - /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear) - ClearVisibilityMapFlags { - new_heap_blkno: Option, - old_heap_blkno: Option, - flags: u8, - }, - /// Mark transaction IDs as committed on a CLOG page - ClogSetCommitted { - xids: Vec, - timestamp: TimestampTz, - }, - /// Mark transaction IDs as aborted on a CLOG page - ClogSetAborted { xids: Vec }, - /// Extend multixact offsets SLRU - MultixactOffsetCreate { - mid: MultiXactId, - moff: MultiXactOffset, - }, - /// Extend multixact members SLRU. - MultixactMembersCreate { - moff: MultiXactOffset, - members: Vec, - }, - /// Update the map of AUX files, either writing or dropping an entry - AuxFile { - file_path: String, - content: Option, - }, - - /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. - #[cfg(test)] - Test { - /// Append a string to the image. - append: String, - /// Clear the image before appending. - clear: bool, - /// Treat this record as an init record. `clear` should be set to true if this field is set - /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and - /// its references in `timeline.rs`. - will_init: bool, - }, +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactCreate { + pub mid: MultiXactId, + /* new MultiXact's ID */ + pub moff: MultiXactOffset, + /* its starting offset in members file */ + pub nmembers: u32, + /* number of member XIDs */ + pub members: Vec, } -impl NeonWalRecord { - /// Does replaying this WAL record initialize the page from scratch, or does - /// it need to be applied over the previous image of the page? - pub fn will_init(&self) -> bool { - // If you change this function, you'll also need to change ValueBytes::will_init - match self { - NeonWalRecord::Postgres { will_init, rec: _ } => *will_init, - #[cfg(test)] - NeonWalRecord::Test { will_init, .. } => *will_init, - // None of the special neon record types currently initialize the page - _ => false, +impl XlMultiXactCreate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { + let mid = buf.get_u32_le(); + let moff = buf.get_u32_le(); + let nmembers = buf.get_u32_le(); + let mut members = Vec::new(); + for _ in 0..nmembers { + members.push(MultiXactMember::decode(buf)); + } + XlMultiXactCreate { + mid, + moff, + nmembers, + members, } } +} - #[cfg(test)] - pub(crate) fn wal_append(s: impl AsRef) -> Self { - Self::Test { - append: s.as_ref().to_string(), - clear: false, - will_init: false, +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactTruncate { + pub oldest_multi_db: Oid, + /* to-be-truncated range of multixact offsets */ + pub start_trunc_off: MultiXactId, + /* just for completeness' sake */ + pub end_trunc_off: MultiXactId, + + /* to-be-truncated range of multixact members */ + pub start_trunc_memb: MultiXactOffset, + pub end_trunc_memb: MultiXactOffset, +} + +impl XlMultiXactTruncate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { + XlMultiXactTruncate { + oldest_multi_db: buf.get_u32_le(), + start_trunc_off: buf.get_u32_le(), + end_trunc_off: buf.get_u32_le(), + start_trunc_memb: buf.get_u32_le(), + end_trunc_memb: buf.get_u32_le(), } } +} - #[cfg(test)] - pub(crate) fn wal_clear() -> Self { - Self::Test { - append: "".to_string(), - clear: true, - will_init: false, +#[repr(C)] +#[derive(Debug)] +pub struct XlRelmapUpdate { + pub dbid: Oid, /* database ID, or 0 for shared map */ + pub tsid: Oid, /* database's tablespace, or pg_global */ + pub nbytes: i32, /* size of relmap data */ +} + +impl XlRelmapUpdate { + pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { + XlRelmapUpdate { + dbid: buf.get_u32_le(), + tsid: buf.get_u32_le(), + nbytes: buf.get_i32_le(), } } +} - #[cfg(test)] - pub(crate) fn wal_init() -> Self { - Self::Test { - append: "".to_string(), - clear: true, - will_init: true, +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginDrop { + pub node_id: RepOriginId, +} + +impl XlReploriginDrop { + pub fn decode(buf: &mut Bytes) -> XlReploriginDrop { + XlReploriginDrop { + node_id: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginSet { + pub remote_lsn: Lsn, + pub node_id: RepOriginId, +} + +impl XlReploriginSet { + pub fn decode(buf: &mut Bytes) -> XlReploriginSet { + XlReploriginSet { + remote_lsn: Lsn(buf.get_u64_le()), + node_id: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct RelFileNode { + pub spcnode: Oid, /* tablespace */ + pub dbnode: Oid, /* database */ + pub relnode: Oid, /* relation */ +} + +#[repr(C)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MultiXactMember { + pub xid: TransactionId, + pub status: MultiXactStatus, +} + +impl MultiXactMember { + pub fn decode(buf: &mut Bytes) -> MultiXactMember { + MultiXactMember { + xid: buf.get_u32_le(), + status: buf.get_u32_le(), } } } @@ -164,17 +201,17 @@ impl DecodedWALRecord { /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations /// by reading other existing relations' data blocks. This is more complex to apply than new-style database /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case. - pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool { + pub fn is_dbase_create_copy(&self, pg_version: u32) -> bool { if self.xl_rmid == pg_constants::RM_DBASE_ID { let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK; match pg_version { 14 => { // Postgres 14 database creations are always the legacy kind - info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE + info == crate::v14::bindings::XLOG_DBASE_CREATE } - 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY, - 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY, - 17 => info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY, _ => { panic!("Unsupported postgres version {pg_version}") } @@ -185,35 +222,294 @@ impl DecodedWALRecord { } } -#[repr(C)] -#[derive(Debug, Clone, Copy)] -pub struct RelFileNode { - pub spcnode: Oid, /* tablespace */ - pub dbnode: Oid, /* database */ - pub relnode: Oid, /* relation */ -} +/// Main routine to decode a WAL record and figure out which blocks are modified +// +// See xlogrecord.h for details +// The overall layout of an XLOG record is: +// Fixed-size header (XLogRecord struct) +// XLogRecordBlockHeader struct +// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows +// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an +// XLogRecordBlockCompressHeader struct follows. +// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows +// BlockNumber follows +// XLogRecordBlockHeader struct +// ... +// XLogRecordDataHeader[Short|Long] struct +// block data +// block data +// ... +// main data +// +// +// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in. +// It would be more natural for this function to return a DecodedWALRecord as return value, +// but reusing the caller-supplied struct avoids an allocation. +// This code is in the hot path for digesting incoming WAL, and is very performance sensitive. +// +pub fn decode_wal_record( + record: Bytes, + decoded: &mut DecodedWALRecord, + pg_version: u32, +) -> anyhow::Result<()> { + let mut rnode_spcnode: u32 = 0; + let mut rnode_dbnode: u32 = 0; + let mut rnode_relnode: u32 = 0; + let mut got_rnode = false; + let mut origin_id: u16 = 0; -#[repr(C)] -#[derive(Debug)] -pub struct XlRelmapUpdate { - pub dbid: Oid, /* database ID, or 0 for shared map */ - pub tsid: Oid, /* database's tablespace, or pg_global */ - pub nbytes: i32, /* size of relmap data */ -} + let mut buf = record.clone(); -impl XlRelmapUpdate { - pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { - XlRelmapUpdate { - dbid: buf.get_u32_le(), - tsid: buf.get_u32_le(), - nbytes: buf.get_i32_le(), + // 1. Parse XLogRecord struct + + // FIXME: assume little-endian here + let xlogrec = XLogRecord::from_bytes(&mut buf)?; + + tracing::trace!( + "decode_wal_record xl_rmid = {} xl_info = {}", + xlogrec.xl_rmid, + xlogrec.xl_info + ); + + let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; + + if buf.remaining() != remaining { + //TODO error + } + + let mut max_block_id = 0; + let mut blocks_total_len: u32 = 0; + let mut main_data_len = 0; + let mut datatotal: u32 = 0; + decoded.blocks.clear(); + + // 2. Decode the headers. + // XLogRecordBlockHeaders if any, + // XLogRecordDataHeader[Short|Long] + while buf.remaining() > datatotal as usize { + let block_id = buf.get_u8(); + + match block_id { + pg_constants::XLR_BLOCK_ID_DATA_SHORT => { + /* XLogRecordDataHeaderShort */ + main_data_len = buf.get_u8() as u32; + datatotal += main_data_len; + } + + pg_constants::XLR_BLOCK_ID_DATA_LONG => { + /* XLogRecordDataHeaderLong */ + main_data_len = buf.get_u32_le(); + datatotal += main_data_len; + } + + pg_constants::XLR_BLOCK_ID_ORIGIN => { + // RepOriginId is uint16 + origin_id = buf.get_u16_le(); + } + + pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { + // TransactionId is uint32 + buf.advance(4); + } + + 0..=pg_constants::XLR_MAX_BLOCK_ID => { + /* XLogRecordBlockHeader */ + let mut blk = DecodedBkpBlock::new(); + + if block_id <= max_block_id { + // TODO + //report_invalid_record(state, + // "out-of-order block_id %u at %X/%X", + // block_id, + // (uint32) (state->ReadRecPtr >> 32), + // (uint32) state->ReadRecPtr); + // goto err; + } + max_block_id = block_id; + + let fork_flags: u8 = buf.get_u8(); + blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; + blk.flags = fork_flags; + blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; + blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; + blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; + blk.data_len = buf.get_u16_le(); + + /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ + + datatotal += blk.data_len as u32; + blocks_total_len += blk.data_len as u32; + + if blk.has_image { + blk.bimg_len = buf.get_u16_le(); + blk.hole_offset = buf.get_u16_le(); + blk.bimg_info = buf.get_u8(); + + blk.apply_image = dispatch_pgversion!( + pg_version, + (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0 + ); + + let blk_img_is_compressed = + crate::bkpimage_is_compressed(blk.bimg_info, pg_version); + + if blk_img_is_compressed { + tracing::debug!("compressed block image , pg_version = {}", pg_version); + } + + if blk_img_is_compressed { + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { + blk.hole_length = buf.get_u16_le(); + } else { + blk.hole_length = 0; + } + } else { + blk.hole_length = BLCKSZ - blk.bimg_len; + } + datatotal += blk.bimg_len as u32; + blocks_total_len += blk.bimg_len as u32; + + /* + * cross-check that hole_offset > 0, hole_length > 0 and + * bimg_len < BLCKSZ if the HAS_HOLE flag is set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 + && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) + { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", + (unsigned int) blk->hole_offset, + (unsigned int) blk->hole_length, + (unsigned int) blk->bimg_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that hole_offset == 0 and hole_length == 0 if + * the HAS_HOLE flag is not set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 + && (blk.hole_offset != 0 || blk.hole_length != 0) + { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", + (unsigned int) blk->hole_offset, + (unsigned int) blk->hole_length, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED + * flag is set. + */ + if !blk_img_is_compressed && blk.bimg_len == BLCKSZ { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", + (unsigned int) blk->bimg_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor + * IS_COMPRESSED flag is set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 + && !blk_img_is_compressed + && blk.bimg_len != BLCKSZ + { + // TODO + /* + report_invalid_record(state, + "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", + (unsigned int) blk->data_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + } + if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { + rnode_spcnode = buf.get_u32_le(); + rnode_dbnode = buf.get_u32_le(); + rnode_relnode = buf.get_u32_le(); + got_rnode = true; + } else if !got_rnode { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; */ + } + + blk.rnode_spcnode = rnode_spcnode; + blk.rnode_dbnode = rnode_dbnode; + blk.rnode_relnode = rnode_relnode; + + blk.blkno = buf.get_u32_le(); + tracing::trace!( + "this record affects {}/{}/{} blk {}", + rnode_spcnode, + rnode_dbnode, + rnode_relnode, + blk.blkno + ); + + decoded.blocks.push(blk); + } + + _ => { + // TODO: invalid block_id + } } } + + // 3. Decode blocks. + let mut ptr = record.len() - buf.remaining(); + for blk in decoded.blocks.iter_mut() { + if blk.has_image { + blk.bimg_offset = ptr as u32; + ptr += blk.bimg_len as usize; + } + if blk.has_data { + ptr += blk.data_len as usize; + } + } + // We don't need them, so just skip blocks_total_len bytes + buf.advance(blocks_total_len as usize); + assert_eq!(ptr, record.len() - buf.remaining()); + + let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; + + // 4. Decode main_data + if main_data_len > 0 { + assert_eq!(buf.remaining(), main_data_len as usize); + } + + decoded.xl_xid = xlogrec.xl_xid; + decoded.xl_info = xlogrec.xl_info; + decoded.xl_rmid = xlogrec.xl_rmid; + decoded.record = record; + decoded.origin_id = origin_id; + decoded.main_data_offset = main_data_offset; + + Ok(()) } pub mod v14 { + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; #[repr(C)] #[derive(Debug)] @@ -383,8 +679,8 @@ pub mod v15 { pub mod v16 { pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange}; + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; pub struct XlHeapDelete { pub xmax: TransactionId, @@ -450,8 +746,8 @@ pub mod v16 { /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */ pub mod rm_neon { + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; #[repr(C)] #[derive(Debug)] @@ -563,8 +859,8 @@ pub mod v16 { pub mod v17 { pub use super::v14::XlHeapLockUpdated; + pub use crate::{TimeLineID, TimestampTz}; use bytes::{Buf, Bytes}; - pub use postgres_ffi::{TimeLineID, TimestampTz}; pub use super::v16::rm_neon; pub use super::v16::{ @@ -742,7 +1038,7 @@ impl XlXactParsedRecord { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); - trace!( + tracing::trace!( "XLOG_XACT_COMMIT relfilenode {}/{}/{}", spcnode, dbnode, @@ -756,9 +1052,9 @@ impl XlXactParsedRecord { } } - if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 { + if xinfo & crate::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 { let nitems = buf.get_i32_le(); - debug!( + tracing::debug!( "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}", nitems ); @@ -778,7 +1074,7 @@ impl XlXactParsedRecord { if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { xid = buf.get_u32_le(); - debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); + tracing::debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); } let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 { @@ -822,78 +1118,6 @@ impl XlClogTruncate { } } -#[repr(C)] -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MultiXactMember { - pub xid: TransactionId, - pub status: MultiXactStatus, -} - -impl MultiXactMember { - pub fn decode(buf: &mut Bytes) -> MultiXactMember { - MultiXactMember { - xid: buf.get_u32_le(), - status: buf.get_u32_le(), - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactCreate { - pub mid: MultiXactId, - /* new MultiXact's ID */ - pub moff: MultiXactOffset, - /* its starting offset in members file */ - pub nmembers: u32, - /* number of member XIDs */ - pub members: Vec, -} - -impl XlMultiXactCreate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { - let mid = buf.get_u32_le(); - let moff = buf.get_u32_le(); - let nmembers = buf.get_u32_le(); - let mut members = Vec::new(); - for _ in 0..nmembers { - members.push(MultiXactMember::decode(buf)); - } - XlMultiXactCreate { - mid, - moff, - nmembers, - members, - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactTruncate { - pub oldest_multi_db: Oid, - /* to-be-truncated range of multixact offsets */ - pub start_trunc_off: MultiXactId, - /* just for completeness' sake */ - pub end_trunc_off: MultiXactId, - - /* to-be-truncated range of multixact members */ - pub start_trunc_memb: MultiXactOffset, - pub end_trunc_memb: MultiXactOffset, -} - -impl XlMultiXactTruncate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { - XlMultiXactTruncate { - oldest_multi_db: buf.get_u32_le(), - start_trunc_off: buf.get_u32_le(), - end_trunc_off: buf.get_u32_le(), - start_trunc_memb: buf.get_u32_le(), - end_trunc_memb: buf.get_u32_le(), - } - } -} - #[repr(C)] #[derive(Debug)] pub struct XlLogicalMessage { @@ -950,337 +1174,7 @@ impl XlRunningXacts { } } -#[repr(C)] -#[derive(Debug)] -pub struct XlReploriginDrop { - pub node_id: RepOriginId, -} - -impl XlReploriginDrop { - pub fn decode(buf: &mut Bytes) -> XlReploriginDrop { - XlReploriginDrop { - node_id: buf.get_u16_le(), - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlReploriginSet { - pub remote_lsn: Lsn, - pub node_id: RepOriginId, -} - -impl XlReploriginSet { - pub fn decode(buf: &mut Bytes) -> XlReploriginSet { - XlReploriginSet { - remote_lsn: Lsn(buf.get_u64_le()), - node_id: buf.get_u16_le(), - } - } -} - -/// Main routine to decode a WAL record and figure out which blocks are modified -// -// See xlogrecord.h for details -// The overall layout of an XLOG record is: -// Fixed-size header (XLogRecord struct) -// XLogRecordBlockHeader struct -// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows -// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an -// XLogRecordBlockCompressHeader struct follows. -// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows -// BlockNumber follows -// XLogRecordBlockHeader struct -// ... -// XLogRecordDataHeader[Short|Long] struct -// block data -// block data -// ... -// main data -// -// -// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in. -// It would be more natural for this function to return a DecodedWALRecord as return value, -// but reusing the caller-supplied struct avoids an allocation. -// This code is in the hot path for digesting incoming WAL, and is very performance sensitive. -// -pub fn decode_wal_record( - record: Bytes, - decoded: &mut DecodedWALRecord, - pg_version: u32, -) -> Result<()> { - let mut rnode_spcnode: u32 = 0; - let mut rnode_dbnode: u32 = 0; - let mut rnode_relnode: u32 = 0; - let mut got_rnode = false; - let mut origin_id: u16 = 0; - - let mut buf = record.clone(); - - // 1. Parse XLogRecord struct - - // FIXME: assume little-endian here - let xlogrec = XLogRecord::from_bytes(&mut buf)?; - - trace!( - "decode_wal_record xl_rmid = {} xl_info = {}", - xlogrec.xl_rmid, - xlogrec.xl_info - ); - - let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; - - if buf.remaining() != remaining { - //TODO error - } - - let mut max_block_id = 0; - let mut blocks_total_len: u32 = 0; - let mut main_data_len = 0; - let mut datatotal: u32 = 0; - decoded.blocks.clear(); - - // 2. Decode the headers. - // XLogRecordBlockHeaders if any, - // XLogRecordDataHeader[Short|Long] - while buf.remaining() > datatotal as usize { - let block_id = buf.get_u8(); - - match block_id { - pg_constants::XLR_BLOCK_ID_DATA_SHORT => { - /* XLogRecordDataHeaderShort */ - main_data_len = buf.get_u8() as u32; - datatotal += main_data_len; - } - - pg_constants::XLR_BLOCK_ID_DATA_LONG => { - /* XLogRecordDataHeaderLong */ - main_data_len = buf.get_u32_le(); - datatotal += main_data_len; - } - - pg_constants::XLR_BLOCK_ID_ORIGIN => { - // RepOriginId is uint16 - origin_id = buf.get_u16_le(); - } - - pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { - // TransactionId is uint32 - buf.advance(4); - } - - 0..=pg_constants::XLR_MAX_BLOCK_ID => { - /* XLogRecordBlockHeader */ - let mut blk = DecodedBkpBlock::new(); - - if block_id <= max_block_id { - // TODO - //report_invalid_record(state, - // "out-of-order block_id %u at %X/%X", - // block_id, - // (uint32) (state->ReadRecPtr >> 32), - // (uint32) state->ReadRecPtr); - // goto err; - } - max_block_id = block_id; - - let fork_flags: u8 = buf.get_u8(); - blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; - blk.flags = fork_flags; - blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; - blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; - blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; - blk.data_len = buf.get_u16_le(); - - /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ - - datatotal += blk.data_len as u32; - blocks_total_len += blk.data_len as u32; - - if blk.has_image { - blk.bimg_len = buf.get_u16_le(); - blk.hole_offset = buf.get_u16_le(); - blk.bimg_info = buf.get_u8(); - - blk.apply_image = dispatch_pgversion!( - pg_version, - (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0 - ); - - let blk_img_is_compressed = - postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version); - - if blk_img_is_compressed { - debug!("compressed block image , pg_version = {}", pg_version); - } - - if blk_img_is_compressed { - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { - blk.hole_length = buf.get_u16_le(); - } else { - blk.hole_length = 0; - } - } else { - blk.hole_length = BLCKSZ - blk.bimg_len; - } - datatotal += blk.bimg_len as u32; - blocks_total_len += blk.bimg_len as u32; - - /* - * cross-check that hole_offset > 0, hole_length > 0 and - * bimg_len < BLCKSZ if the HAS_HOLE flag is set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 - && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) - { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", - (unsigned int) blk->hole_offset, - (unsigned int) blk->hole_length, - (unsigned int) blk->bimg_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that hole_offset == 0 and hole_length == 0 if - * the HAS_HOLE flag is not set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 - && (blk.hole_offset != 0 || blk.hole_length != 0) - { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", - (unsigned int) blk->hole_offset, - (unsigned int) blk->hole_length, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED - * flag is set. - */ - if !blk_img_is_compressed && blk.bimg_len == BLCKSZ { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", - (unsigned int) blk->bimg_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor - * IS_COMPRESSED flag is set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 - && !blk_img_is_compressed - && blk.bimg_len != BLCKSZ - { - // TODO - /* - report_invalid_record(state, - "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", - (unsigned int) blk->data_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - } - if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { - rnode_spcnode = buf.get_u32_le(); - rnode_dbnode = buf.get_u32_le(); - rnode_relnode = buf.get_u32_le(); - got_rnode = true; - } else if !got_rnode { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; */ - } - - blk.rnode_spcnode = rnode_spcnode; - blk.rnode_dbnode = rnode_dbnode; - blk.rnode_relnode = rnode_relnode; - - blk.blkno = buf.get_u32_le(); - trace!( - "this record affects {}/{}/{} blk {}", - rnode_spcnode, - rnode_dbnode, - rnode_relnode, - blk.blkno - ); - - decoded.blocks.push(blk); - } - - _ => { - // TODO: invalid block_id - } - } - } - - // 3. Decode blocks. - let mut ptr = record.len() - buf.remaining(); - for blk in decoded.blocks.iter_mut() { - if blk.has_image { - blk.bimg_offset = ptr as u32; - ptr += blk.bimg_len as usize; - } - if blk.has_data { - ptr += blk.data_len as usize; - } - } - // We don't need them, so just skip blocks_total_len bytes - buf.advance(blocks_total_len as usize); - assert_eq!(ptr, record.len() - buf.remaining()); - - let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; - - // 4. Decode main_data - if main_data_len > 0 { - assert_eq!(buf.remaining(), main_data_len as usize); - } - - decoded.xl_xid = xlogrec.xl_xid; - decoded.xl_info = xlogrec.xl_info; - decoded.xl_rmid = xlogrec.xl_rmid; - decoded.record = record; - decoded.origin_id = origin_id; - decoded.main_data_offset = main_data_offset; - - Ok(()) -} - -/// -/// Build a human-readable string to describe a WAL record -/// -/// For debugging purposes -pub fn describe_wal_record(rec: &NeonWalRecord) -> Result { - match rec { - NeonWalRecord::Postgres { will_init, rec } => Ok(format!( - "will_init: {}, {}", - will_init, - describe_postgres_wal_record(rec)? - )), - _ => Ok(format!("{:?}", rec)), - } -} - -fn describe_postgres_wal_record(record: &Bytes) -> Result { +pub fn describe_postgres_wal_record(record: &Bytes) -> Result { // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this. // Maybe use the postgres wal redo process, the same used for replaying WAL records? // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly, diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml new file mode 100644 index 0000000000..3f80f8fcdb --- /dev/null +++ b/libs/wal_decoder/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "wal_decoder" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[features] +testing = [] + +[dependencies] +anyhow.workspace = true +bytes.workspace = true +pageserver_api.workspace = true +postgres_ffi.workspace = true +serde.workspace = true +tracing.workspace = true +utils.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/libs/wal_decoder/src/decoder.rs @@ -0,0 +1 @@ + diff --git a/libs/wal_decoder/src/lib.rs b/libs/wal_decoder/src/lib.rs new file mode 100644 index 0000000000..05349d17c9 --- /dev/null +++ b/libs/wal_decoder/src/lib.rs @@ -0,0 +1,2 @@ +pub mod decoder; +pub mod models; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs new file mode 100644 index 0000000000..58f8e1b2da --- /dev/null +++ b/libs/wal_decoder/src/models.rs @@ -0,0 +1,167 @@ +//! This module houses types which represent decoded PG WAL records +//! ready for the pageserver to interpret. They are derived from the original +//! WAL records, so that each struct corresponds closely to one WAL record of +//! a specific kind. They contain the same information as the original WAL records, +//! just decoded into structs and fields for easier access. +//! +//! The ingestion code uses these structs to help with parsing the WAL records, +//! and it splits them into a stream of modifications to the key-value pairs that +//! are ultimately stored in delta layers. See also the split-out counterparts in +//! [`postgres_ffi::walrecord`]. +//! +//! The pipeline which processes WAL records is not super obvious, so let's follow +//! the flow of an example XACT_COMMIT Postgres record: +//! +//! (Postgres XACT_COMMIT record) +//! | +//! |--> pageserver::walingest::WalIngest::decode_xact_record +//! | +//! |--> ([`XactRecord::Commit`]) +//! | +//! |--> pageserver::walingest::WalIngest::ingest_xact_record +//! | +//! |--> (NeonWalRecord::ClogSetCommitted) +//! | +//! |--> write to KV store within the pageserver + +use bytes::Bytes; +use pageserver_api::reltag::{RelTag, SlruKind}; +use postgres_ffi::walrecord::{ + XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet, + XlSmgrTruncate, XlXactParsedRecord, +}; +use postgres_ffi::{Oid, TransactionId}; +use utils::lsn::Lsn; + +pub enum HeapamRecord { + ClearVmBits(ClearVmBits), +} + +pub struct ClearVmBits { + pub new_heap_blkno: Option, + pub old_heap_blkno: Option, + pub vm_rel: RelTag, + pub flags: u8, +} + +pub enum NeonrmgrRecord { + ClearVmBits(ClearVmBits), +} + +pub enum SmgrRecord { + Create(SmgrCreate), + Truncate(XlSmgrTruncate), +} + +pub struct SmgrCreate { + pub rel: RelTag, +} + +pub enum DbaseRecord { + Create(DbaseCreate), + Drop(DbaseDrop), +} + +pub struct DbaseCreate { + pub db_id: Oid, + pub tablespace_id: Oid, + pub src_db_id: Oid, + pub src_tablespace_id: Oid, +} + +pub struct DbaseDrop { + pub db_id: Oid, + pub tablespace_ids: Vec, +} + +pub enum ClogRecord { + ZeroPage(ClogZeroPage), + Truncate(ClogTruncate), +} + +pub struct ClogZeroPage { + pub segno: u32, + pub rpageno: u32, +} + +pub struct ClogTruncate { + pub pageno: u32, + pub oldest_xid: TransactionId, + pub oldest_xid_db: Oid, +} + +pub enum XactRecord { + Commit(XactCommon), + Abort(XactCommon), + CommitPrepared(XactCommon), + AbortPrepared(XactCommon), + Prepare(XactPrepare), +} + +pub struct XactCommon { + pub parsed: XlXactParsedRecord, + pub origin_id: u16, + // Fields below are only used for logging + pub xl_xid: TransactionId, + pub lsn: Lsn, +} + +pub struct XactPrepare { + pub xl_xid: TransactionId, + pub data: Bytes, +} + +pub enum MultiXactRecord { + ZeroPage(MultiXactZeroPage), + Create(XlMultiXactCreate), + Truncate(XlMultiXactTruncate), +} + +pub struct MultiXactZeroPage { + pub slru_kind: SlruKind, + pub segno: u32, + pub rpageno: u32, +} + +pub enum RelmapRecord { + Update(RelmapUpdate), +} + +pub struct RelmapUpdate { + pub update: XlRelmapUpdate, + pub buf: Bytes, +} + +pub enum XlogRecord { + Raw(RawXlogRecord), +} + +pub struct RawXlogRecord { + pub info: u8, + pub lsn: Lsn, + pub buf: Bytes, +} + +pub enum LogicalMessageRecord { + Put(PutLogicalMessage), + #[cfg(feature = "testing")] + Failpoint, +} + +pub struct PutLogicalMessage { + pub path: String, + pub buf: Bytes, +} + +pub enum StandbyRecord { + RunningXacts(StandbyRunningXacts), +} + +pub struct StandbyRunningXacts { + pub oldest_running_xid: TransactionId, +} + +pub enum ReploriginRecord { + Set(XlReploriginSet), + Drop(XlReploriginDrop), +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 2531abc7a1..ecb8fa7491 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true default = [] # Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, # which adds some runtime cost to run tests on outage conditions -testing = ["fail/failpoints", "pageserver_api/testing" ] +testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing"] [dependencies] anyhow.workspace = true @@ -83,6 +83,7 @@ enum-map.workspace = true enumset = { workspace = true, features = ["serde"]} strum.workspace = true strum_macros.workspace = true +wal_decoder.workspace = true [target.'cfg(target_os = "linux")'.dependencies] procfs.workspace = true diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index d98b23acce..0a1ad9cd6b 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -8,13 +8,12 @@ use pageserver::{ context::{DownloadBehavior, RequestContext}, l0_flush::{L0FlushConfig, L0FlushGlobalState}, page_cache, - repository::Value, task_mgr::TaskKind, tenant::storage_layer::inmemory_layer::SerializedBatch, tenant::storage_layer::InMemoryLayer, virtual_file, }; -use pageserver_api::{key::Key, shard::TenantShardId}; +use pageserver_api::{key::Key, shard::TenantShardId, value::Value}; use utils::{ bin_ser::BeSer, id::{TenantId, TimelineId}, diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 1353e79f7c..5c5b52db44 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,9 +1,9 @@ use criterion::measurement::WallTime; use pageserver::keyspace::{KeyPartitioning, KeySpace}; -use pageserver::repository::Key; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::storage_layer::PersistentLayerDesc; +use pageserver_api::key::Key; use pageserver_api::shard::TenantShardId; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use std::cmp::{max, min}; diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 45936cb3fa..d3551b56e1 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -60,7 +60,8 @@ use anyhow::Context; use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; use once_cell::sync::Lazy; -use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; +use pageserver::{config::PageServerConf, walredo::PostgresRedoManager}; +use pageserver_api::record::NeonWalRecord; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ future::Future, diff --git a/pageserver/ctl/src/draw_timeline_dir.rs b/pageserver/ctl/src/draw_timeline_dir.rs index bc939f9688..177e65ef79 100644 --- a/pageserver/ctl/src/draw_timeline_dir.rs +++ b/pageserver/ctl/src/draw_timeline_dir.rs @@ -51,7 +51,7 @@ //! use anyhow::{Context, Result}; -use pageserver::repository::Key; +use pageserver_api::key::Key; use std::cmp::Ordering; use std::io::{self, BufRead}; use std::path::PathBuf; diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 7dd2a5d05c..451d2a1d69 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -14,12 +14,12 @@ use std::ops::Range; use std::{fs, str}; use pageserver::page_cache::{self, PAGE_SZ}; -use pageserver::repository::{Key, KEY_SIZE}; use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; use pageserver::virtual_file::{self, VirtualFile}; +use pageserver_api::key::{Key, KEY_SIZE}; use utils::{bin_ser::BeSer, lsn::Lsn}; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index c0b2b6ae89..22627d72c8 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -14,13 +14,13 @@ use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use pageserver::virtual_file::api::IoMode; use pageserver::{page_cache, virtual_file}; use pageserver::{ - repository::{Key, KEY_SIZE}, tenant::{ block_io::FileBlockReader, disk_btree::VisitDirection, storage_layer::delta_layer::DELTA_KEY_SIZE, }, virtual_file::VirtualFile, }; +use pageserver_api::key::{Key, KEY_SIZE}; use std::fs; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 73bdc90213..7733bdb640 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -696,7 +696,7 @@ impl DeletionQueue { mod test { use camino::Utf8Path; use hex_literal::hex; - use pageserver_api::{shard::ShardIndex, upcall_api::ReAttachResponseTenant}; + use pageserver_api::{key::Key, shard::ShardIndex, upcall_api::ReAttachResponseTenant}; use std::{io::ErrorKind, time::Duration}; use tracing::info; @@ -705,7 +705,6 @@ mod test { use crate::{ controller_upcall_client::RetryForeverError, - repository::Key, tenant::{harness::TenantHarness, storage_layer::DeltaLayerName}, }; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3943f62ac0..2d8f4309ca 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2232,13 +2232,13 @@ async fn getpage_at_lsn_handler( check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); - struct Key(crate::repository::Key); + struct Key(pageserver_api::key::Key); impl std::str::FromStr for Key { type Err = anyhow::Error; fn from_str(s: &str) -> std::result::Result { - crate::repository::Key::from_hex(s).map(Key) + pageserver_api::key::Key::from_hex(s).map(Key) } } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ca87f1d080..530c91c4da 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -19,12 +19,11 @@ use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::*; use crate::tenant::Timeline; use crate::walingest::WalIngest; -use crate::walrecord::decode_wal_record; -use crate::walrecord::DecodedWALRecord; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord}; use postgres_ffi::ControlFileData; use postgres_ffi::DBState_DB_SHUTDOWNED; use postgres_ffi::Oid; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index d51931c768..ef6711397a 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -24,7 +24,6 @@ pub mod metrics; pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; -pub mod repository; pub mod span; pub(crate) mod statvfs; pub mod task_mgr; @@ -32,7 +31,6 @@ pub mod tenant; pub mod utilization; pub mod virtual_file; pub mod walingest; -pub mod walrecord; pub mod walredo; use camino::Utf8Path; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 19233a28cc..dc2dc08b53 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -7,14 +7,14 @@ //! Clarify that) //! use super::tenant::{PageReconstructError, Timeline}; +use crate::aux_file; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; -use crate::walrecord::NeonWalRecord; -use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; +use pageserver_api::key::Key; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, @@ -22,7 +22,9 @@ use pageserver_api::key::{ CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::value::Value; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7011ae9e63..8445601d29 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -92,11 +92,11 @@ use crate::metrics::{ remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, }; -use crate::repository::GcResult; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::config::LocationMode; use crate::tenant::config::TenantConfOpt; +use crate::tenant::gc_result::GcResult; pub use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::remote_initdb_archive_path; use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart; @@ -160,6 +160,7 @@ pub(crate) mod timeline; pub mod size; mod gc_block; +mod gc_result; pub(crate) mod throttle; pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -467,10 +468,10 @@ impl WalRedoManager { /// This method is cancellation-safe. pub async fn request_redo( &self, - key: crate::repository::Key, + key: pageserver_api::key::Key, lsn: Lsn, base_img: Option<(Lsn, bytes::Bytes)>, - records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>, + records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, pg_version: u32, ) -> Result { match self { @@ -4818,7 +4819,8 @@ pub(crate) mod harness { use crate::deletion_queue::mock::MockDeletionQueue; use crate::l0_flush::L0FlushConfig; use crate::walredo::apply_neon; - use crate::{repository::Key, walrecord::NeonWalRecord}; + use pageserver_api::key::Key; + use pageserver_api::record::NeonWalRecord; use super::*; use hex_literal::hex; @@ -5087,25 +5089,30 @@ mod tests { use super::*; use crate::keyspace::KeySpaceAccum; - use crate::repository::{Key, Value}; use crate::tenant::harness::*; use crate::tenant::timeline::CompactFlags; - use crate::walrecord::NeonWalRecord; use crate::DEFAULT_PG_VERSION; use bytes::{Bytes, BytesMut}; use hex_literal::hex; use itertools::Itertools; - use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE}; + use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; + use pageserver_api::value::Value; use rand::{thread_rng, Rng}; use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; - use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; - use timeline::{DeltaLayerTestDesc, GcInfo}; + use timeline::DeltaLayerTestDesc; use utils::id::TenantId; + #[cfg(feature = "testing")] + use pageserver_api::record::NeonWalRecord; + #[cfg(feature = "testing")] + use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; + #[cfg(feature = "testing")] + use timeline::GcInfo; + static TEST_KEY: Lazy = Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001"))); @@ -7670,6 +7677,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_neon_test_record() -> anyhow::Result<()> { let harness = TenantHarness::create("test_neon_test_record").await?; @@ -7861,6 +7869,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> { let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas").await?; @@ -8057,6 +8066,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_generate_key_retention() -> anyhow::Result<()> { let harness = TenantHarness::create("test_generate_key_retention").await?; @@ -8404,6 +8414,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_with_retain_lsns() -> anyhow::Result<()> { let harness = @@ -8644,6 +8655,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_with_retain_lsns_single_key() -> anyhow::Result<()> { @@ -8852,6 +8864,7 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; @@ -9053,6 +9066,7 @@ mod tests { // // When querying the key range [A, B) we need to read at different LSN ranges // for [A, C) and [C, B). This test checks that the described edge case is handled correctly. + #[cfg(feature = "testing")] #[tokio::test] async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> { let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?; diff --git a/pageserver/src/tenant/gc_result.rs b/pageserver/src/tenant/gc_result.rs new file mode 100644 index 0000000000..c805aafeab --- /dev/null +++ b/pageserver/src/tenant/gc_result.rs @@ -0,0 +1,57 @@ +use anyhow::Result; +use serde::Serialize; +use std::ops::AddAssign; +use std::time::Duration; + +/// +/// Result of performing GC +/// +#[derive(Default, Serialize, Debug)] +pub struct GcResult { + pub layers_total: u64, + pub layers_needed_by_cutoff: u64, + pub layers_needed_by_pitr: u64, + pub layers_needed_by_branches: u64, + pub layers_needed_by_leases: u64, + pub layers_not_updated: u64, + pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. + + #[serde(serialize_with = "serialize_duration_as_millis")] + pub elapsed: Duration, + + /// The layers which were garbage collected. + /// + /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be + /// dropped in tests. + #[cfg(feature = "testing")] + #[serde(skip)] + pub(crate) doomed_layers: Vec, +} + +// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds +fn serialize_duration_as_millis(d: &Duration, serializer: S) -> Result +where + S: serde::Serializer, +{ + d.as_millis().serialize(serializer) +} + +impl AddAssign for GcResult { + fn add_assign(&mut self, other: Self) { + self.layers_total += other.layers_total; + self.layers_needed_by_pitr += other.layers_needed_by_pitr; + self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; + self.layers_needed_by_branches += other.layers_needed_by_branches; + self.layers_needed_by_leases += other.layers_needed_by_leases; + self.layers_not_updated += other.layers_not_updated; + self.layers_removed += other.layers_removed; + + self.elapsed += other.elapsed; + + #[cfg(feature = "testing")] + { + let mut other = other; + self.doomed_layers.append(&mut other.doomed_layers); + } + } +} diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 707233b003..7f15baed10 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -48,9 +48,9 @@ mod layer_coverage; use crate::context::RequestContext; use crate::keyspace::KeyPartitioning; -use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use anyhow::Result; +use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceAccum}; use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze}; use std::collections::{HashMap, VecDeque}; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 0567f8f3a7..a4c458b737 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2811,7 +2811,7 @@ where } use { - crate::repository::GcResult, pageserver_api::models::TimelineGcRequest, + crate::tenant::gc_result::GcResult, pageserver_api::models::TimelineGcRequest, utils::http::error::ApiError, }; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 4a63491e90..8f4219bbbc 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -11,11 +11,11 @@ mod layer_name; pub mod merge_iterator; use crate::context::{AccessStatsBehavior, RequestContext}; -use crate::repository::Value; -use crate::walrecord::NeonWalRecord; use bytes::Bytes; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::value::Value; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 272e422c90..8a397ceb7a 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -5,7 +5,8 @@ use pageserver_api::key::{Key, KEY_SIZE}; use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; use crate::tenant::storage_layer::Layer; -use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; +use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline}; +use pageserver_api::value::Value; use super::layer::S3_UPLOAD_LIMIT; use super::{ diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 641729d681..10165b1d06 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -30,7 +30,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; -use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{ @@ -46,7 +45,7 @@ use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; -use crate::{walrecord, TEMP_FILE_SUFFIX}; +use crate::TEMP_FILE_SUFFIX; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; @@ -54,9 +53,11 @@ use futures::StreamExt; use itertools::Itertools; use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::DBDIR_KEY; +use pageserver_api::key::{Key, KEY_SIZE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; +use pageserver_api::value::Value; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; @@ -1293,7 +1294,7 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = crate::repository::ValueBytes::will_init(&data) + let will_init = pageserver_api::value::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); @@ -1356,7 +1357,7 @@ impl DeltaLayerInner { format!(" img {} bytes", img.len()) } Value::WalRecord(rec) => { - let wal_desc = walrecord::describe_wal_record(&rec)?; + let wal_desc = pageserver_api::record::describe_wal_record(&rec)?; format!( " rec {} bytes will_init: {} {}", buf.len(), @@ -1610,7 +1611,6 @@ pub(crate) mod test { use rand::RngCore; use super::*; - use crate::repository::Value; use crate::tenant::harness::TIMELINE_ID; use crate::tenant::storage_layer::{Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; @@ -1622,6 +1622,7 @@ pub(crate) mod test { DEFAULT_PG_VERSION, }; use bytes::Bytes; + use pageserver_api::value::Value; /// Construct an index for a fictional delta layer and and then /// traverse in order to plan vectored reads for a query. Finally, @@ -1974,8 +1975,8 @@ pub(crate) mod test { #[tokio::test] async fn copy_delta_prefix_smoke() { - use crate::walrecord::NeonWalRecord; use bytes::Bytes; + use pageserver_api::record::NeonWalRecord; let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke") .await @@ -2198,6 +2199,7 @@ pub(crate) mod test { (k1, l1).cmp(&(k2, l2)) } + #[cfg(feature = "testing")] pub(crate) fn sort_delta_value( (k1, l1, v1): &(Key, Lsn, Value), (k2, l2, v2): &(Key, Lsn, Value), diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index f45dd4b801..ccfcf68e8f 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -7,7 +7,7 @@ use pageserver_api::{ }; use utils::lsn::Lsn; -use crate::repository::Value; +use pageserver_api::value::Value; use super::merge_iterator::MergeIterator; @@ -121,8 +121,8 @@ mod tests { #[tokio::test] async fn filter_keyspace_iterator() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator") .await diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3f90df312d..c0d183dc08 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -28,7 +28,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; -use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, FileBlockReader}; use crate::tenant::disk_btree::{ @@ -51,8 +50,10 @@ use hex; use itertools::Itertools; use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::DBDIR_KEY; +use pageserver_api::key::{Key, KEY_SIZE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; +use pageserver_api::value::Value; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; @@ -1125,6 +1126,7 @@ mod test { use pageserver_api::{ key::Key, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}, + value::Value, }; use utils::{ generation::Generation, @@ -1134,7 +1136,6 @@ mod test { use crate::{ context::RequestContext, - repository::Value, tenant::{ config::TenantConf, harness::{TenantHarness, TIMELINE_ID}, diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 7573ddb5cc..df448a0963 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -7,7 +7,6 @@ use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::repository::{Key, Value}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; @@ -16,9 +15,11 @@ use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Context, Result}; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; +use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; +use pageserver_api::value::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, OnceLock}; use std::time::Instant; diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 9de70f14ee..36dcc8d805 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -760,8 +760,8 @@ async fn evict_and_wait_does_not_wait_for_download() { /// Also checks that the same does not happen on a non-evicted layer (regression test). #[tokio::test(start_paused = true)] async fn eviction_cancellation_on_drop() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); @@ -782,7 +782,7 @@ async fn eviction_cancellation_on_drop() { let mut writer = timeline.writer().await; writer .put( - crate::repository::Key::from_i128(5), + pageserver_api::key::Key::from_i128(5), Lsn(0x20), &Value::Image(Bytes::from_static(b"this does not matter either")), &ctx, diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index a30c25d780..2097e90764 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -3,7 +3,7 @@ use pageserver_api::shard::TenantShardId; use std::ops::Range; use utils::{id::TimelineId, lsn::Lsn}; -use crate::repository::Key; +use pageserver_api::key::Key; use super::{DeltaLayerName, ImageLayerName, LayerName}; diff --git a/pageserver/src/tenant/storage_layer/layer_name.rs b/pageserver/src/tenant/storage_layer/layer_name.rs index 8e750e1187..2b98d74f9f 100644 --- a/pageserver/src/tenant/storage_layer/layer_name.rs +++ b/pageserver/src/tenant/storage_layer/layer_name.rs @@ -1,7 +1,7 @@ //! //! Helper functions for dealing with filenames of the image and delta layer files. //! -use crate::repository::Key; +use pageserver_api::key::Key; use std::borrow::Cow; use std::cmp::Ordering; use std::fmt; diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index f91e27241d..980202f12c 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -7,7 +7,8 @@ use anyhow::bail; use pageserver_api::key::Key; use utils::lsn::Lsn; -use crate::{context::RequestContext, repository::Value}; +use crate::context::RequestContext; +use pageserver_api::value::Value; use super::{ delta_layer::{DeltaLayerInner, DeltaLayerIterator}, @@ -291,12 +292,16 @@ mod tests { use crate::{ tenant::{ harness::{TenantHarness, TIMELINE_ID}, - storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value}, + storage_layer::delta_layer::test::{produce_delta_layer, sort_delta}, }, - walrecord::NeonWalRecord, DEFAULT_PG_VERSION, }; + #[cfg(feature = "testing")] + use crate::tenant::storage_layer::delta_layer::test::sort_delta_value; + #[cfg(feature = "testing")] + use pageserver_api::record::NeonWalRecord; + async fn assert_merge_iter_equal( merge_iter: &mut MergeIterator<'_>, expect: &[(Key, Lsn, Value)], @@ -319,8 +324,8 @@ mod tests { #[tokio::test] async fn merge_in_between() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_merge_in_between") .await @@ -384,8 +389,8 @@ mod tests { #[tokio::test] async fn delta_merge() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_merge") .await @@ -458,10 +463,11 @@ mod tests { // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge } + #[cfg(feature = "testing")] #[tokio::test] async fn delta_image_mixed_merge() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge") .await @@ -586,5 +592,6 @@ mod tests { is_send(merge_iter); } + #[cfg(feature = "testing")] fn is_send(_: impl Send) {} } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f8d61dac5e..d765a7c987 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -125,11 +125,12 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; -use crate::repository::GcResult; -use crate::repository::{Key, Value}; use crate::task_mgr; use crate::task_mgr::TaskKind; +use crate::tenant::gc_result::GcResult; use crate::ZERO_PAGE; +use pageserver_api::key::Key; +use pageserver_api::value::Value; use self::delete::DeleteTimelineFlow; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -5822,17 +5823,15 @@ fn is_send() { #[cfg(test)] mod tests { use pageserver_api::key::Key; + use pageserver_api::value::Value; use utils::{id::TimelineId, lsn::Lsn}; - use crate::{ - repository::Value, - tenant::{ - harness::{test_img, TenantHarness}, - layer_map::LayerMap, - storage_layer::{Layer, LayerName}, - timeline::{DeltaLayerTestDesc, EvictionError}, - Timeline, - }, + use crate::tenant::{ + harness::{test_img, TenantHarness}, + layer_map::LayerMap, + storage_layer::{Layer, LayerName}, + timeline::{DeltaLayerTestDesc, EvictionError}, + Timeline, }; #[tokio::test] diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 73e4f0e87c..70f93656cd 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -49,9 +49,10 @@ use pageserver_api::config::tenant_conf_defaults::{ DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD, }; -use crate::keyspace::KeySpace; -use crate::repository::{Key, Value}; -use crate::walrecord::NeonWalRecord; +use pageserver_api::key::Key; +use pageserver_api::keyspace::KeySpace; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::value::Value; use utils::lsn::Lsn; @@ -2148,7 +2149,7 @@ struct ResidentDeltaLayer(ResidentLayer); struct ResidentImageLayer(ResidentLayer); impl CompactionJobExecutor for TimelineAdaptor { - type Key = crate::repository::Key; + type Key = pageserver_api::key::Key; type Layer = OwnArc; type DeltaLayer = ResidentDeltaLayer; diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index cee259e2e0..739fadbc6b 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -31,11 +31,11 @@ use crate::{ task_mgr::{TaskKind, WALRECEIVER_RUNTIME}, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, - walrecord::{decode_wal_record, DecodedWALRecord}, }; use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord}; use utils::{id::NodeId, lsn::Lsn}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 9e43e10801..27b3f93845 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -29,8 +29,10 @@ use std::time::Instant; use std::time::SystemTime; use pageserver_api::shard::ShardIdentity; +use postgres_ffi::walrecord::*; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; +use wal_decoder::models::*; use anyhow::{bail, Context, Result}; use bytes::{Buf, Bytes, BytesMut}; @@ -44,9 +46,9 @@ use crate::pgdatadir_mapping::{DatadirModification, Version}; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; -use crate::walrecord::*; use crate::ZERO_PAGE; use pageserver_api::key::rel_block_to_key; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -108,143 +110,6 @@ struct WarnIngestLag { timestamp_invalid_msg_ratelimit: RateLimit, } -// These structs are an intermediary representation of the PostgreSQL WAL records. -// The ones prefixed with `Xl` are lower level, while the ones that are not have -// all the required context to be acted upon by the pageserver. - -enum HeapamRecord { - ClearVmBits(ClearVmBits), -} - -struct ClearVmBits { - new_heap_blkno: Option, - old_heap_blkno: Option, - vm_rel: RelTag, - flags: u8, -} - -enum NeonrmgrRecord { - ClearVmBits(ClearVmBits), -} - -enum SmgrRecord { - Create(SmgrCreate), - Truncate(XlSmgrTruncate), -} - -struct SmgrCreate { - rel: RelTag, -} - -enum DbaseRecord { - Create(DbaseCreate), - Drop(DbaseDrop), -} - -struct DbaseCreate { - db_id: u32, - tablespace_id: u32, - src_db_id: u32, - src_tablespace_id: u32, -} - -struct DbaseDrop { - db_id: u32, - tablespace_ids: Vec, -} - -enum ClogRecord { - ZeroPage(ClogZeroPage), - Truncate(ClogTruncate), -} - -struct ClogZeroPage { - segno: u32, - rpageno: u32, -} - -struct ClogTruncate { - pageno: u32, - oldest_xid: u32, - oldest_xid_db: u32, -} - -enum XactRecord { - Commit(XactCommon), - Abort(XactCommon), - CommitPrepared(XactCommon), - AbortPrepared(XactCommon), - Prepare(XactPrepare), -} - -struct XactCommon { - parsed: XlXactParsedRecord, - origin_id: u16, - // Fields below are only used for logging - xl_xid: u32, - lsn: Lsn, -} - -struct XactPrepare { - xl_xid: u32, - data: Bytes, -} - -enum MultiXactRecord { - ZeroPage(MultiXactZeroPage), - Create(XlMultiXactCreate), - Truncate(XlMultiXactTruncate), -} - -struct MultiXactZeroPage { - slru_kind: SlruKind, - segno: u32, - rpageno: u32, -} - -enum RelmapRecord { - Update(RelmapUpdate), -} - -struct RelmapUpdate { - update: XlRelmapUpdate, - buf: Bytes, -} - -enum XlogRecord { - Raw(RawXlogRecord), -} - -struct RawXlogRecord { - info: u8, - lsn: Lsn, - buf: Bytes, -} - -enum LogicalMessageRecord { - Put(PutLogicalMessage), - #[cfg(feature = "testing")] - Failpoint, -} - -struct PutLogicalMessage { - path: String, - buf: Bytes, -} - -enum StandbyRecord { - RunningXacts(StandbyRunningXacts), -} - -struct StandbyRunningXacts { - oldest_running_xid: u32, -} - -enum ReploriginRecord { - Set(XlReploriginSet), - Drop(XlReploriginDrop), -} - impl WalIngest { pub async fn new( timeline: &Timeline, @@ -284,7 +149,6 @@ impl WalIngest { /// relations/pages that the record affects. /// /// This function returns `true` if the record was ingested, and `false` if it was filtered out - /// pub async fn ingest_record( &mut self, decoded: DecodedWALRecord, @@ -2218,7 +2082,7 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); + let xlrec = XlLogicalMessage::decode(buf); let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; #[cfg(feature = "testing")] @@ -2246,7 +2110,7 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = crate::walrecord::XlRunningXacts::decode(buf); + let xlrec = XlRunningXacts::decode(buf); return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { oldest_running_xid: xlrec.oldest_running_xid, }))); @@ -2276,10 +2140,10 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = crate::walrecord::XlReploriginSet::decode(buf); + let xlrec = XlReploriginSet::decode(buf); return Ok(Some(ReploriginRecord::Set(xlrec))); } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = crate::walrecord::XlReploriginDrop::decode(buf); + let xlrec = XlReploriginDrop::decode(buf); return Ok(Some(ReploriginRecord::Drop(xlrec))); } @@ -3146,6 +3010,7 @@ mod tests { async fn test_ingest_real_wal() { use crate::tenant::harness::*; use postgres_ffi::waldecoder::WalStreamDecoder; + use postgres_ffi::walrecord::decode_wal_record; use postgres_ffi::WAL_SEGMENT_SIZE; // Define test data path and constants. diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a1c9fc5651..027a6eb7d7 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -29,11 +29,11 @@ use crate::metrics::{ WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME, }; -use crate::repository::Key; -use crate::walrecord::NeonWalRecord; use anyhow::Context; use bytes::{Bytes, BytesMut}; +use pageserver_api::key::Key; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; +use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use std::future::Future; use std::sync::Arc; @@ -548,9 +548,10 @@ impl PostgresRedoManager { #[cfg(test)] mod tests { use super::PostgresRedoManager; - use crate::repository::Key; - use crate::{config::PageServerConf, walrecord::NeonWalRecord}; + use crate::config::PageServerConf; use bytes::Bytes; + use pageserver_api::key::Key; + use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use std::str::FromStr; use tracing::Instrument; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index c067787f97..7aaa357318 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -1,8 +1,8 @@ -use crate::walrecord::NeonWalRecord; use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use pageserver_api::key::Key; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; @@ -238,7 +238,7 @@ pub(crate) fn apply_in_neon( // No-op: this record will never be created in aux v2. warn!("AuxFile record should not be created in aux v2"); } - #[cfg(test)] + #[cfg(feature = "testing")] NeonWalRecord::Test { append, clear, diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index f3197e68b5..7e9477cfbc 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -8,10 +8,10 @@ use crate::{ metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, page_cache::PAGE_SZ, span::debug_assert_current_span_has_tenant_id, - walrecord::NeonWalRecord, }; use anyhow::Context; use bytes::Bytes; +use pageserver_api::record::NeonWalRecord; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; use postgres_ffi::BLCKSZ; #[cfg(feature = "testing")]