From b87ac5b37555e674cf625a35d4d03dd79c93db4a Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 22 Oct 2024 11:00:10 +0200 Subject: [PATCH] pageserver: move things around to prepare for decoding logic We wish to have high level WAL decoding logic in `wal_decoder::decoder` module. For this we need the `Value` and `NeonWalRecord` types accessible there, so: 1. Move `Value` and `NeonWalRecord` to `pageserver_api::value` and `pageserver_api::record` respectively. I had to add a testing feature to `pageserver_api` to get this working due to `NeonWalRecord` test directives. 2. Get rid of `pageserver::repository` (follow up from (1)) 3. Move PG specific WAL record types to `postgres_ffi::record`. In theory they could live in `wal_decoder`, but it would create a circular dependency between `wal_decoder` and `postgres_ffi`. Long term it makes sennse for those types to be PG version specific, so that will work out nicely. 4. Move higher level WAL record types (to be ingested by pageserver) into `wal_decoder::models` --- Cargo.lock | 15 + Cargo.toml | 2 + libs/pageserver_api/src/lib.rs | 2 + libs/pageserver_api/src/record.rs | 113 ++ .../pageserver_api/src/value.rs | 70 +- libs/postgres_ffi/Cargo.toml | 1 + libs/postgres_ffi/src/lib.rs | 1 + .../postgres_ffi/src/record.rs | 1168 ++++++++--------- libs/wal_decoder/Cargo.toml | 17 + libs/wal_decoder/src/decoder.rs | 0 libs/wal_decoder/src/lib.rs | 2 + libs/wal_decoder/src/models.rs | 144 ++ pageserver/Cargo.toml | 4 +- pageserver/benches/bench_ingest.rs | 4 +- pageserver/benches/bench_layer_map.rs | 2 +- pageserver/benches/bench_walredo.rs | 3 +- pageserver/ctl/src/draw_timeline_dir.rs | 2 +- pageserver/ctl/src/layer_map_analyzer.rs | 2 +- pageserver/ctl/src/layers.rs | 4 +- pageserver/src/deletion_queue.rs | 3 +- pageserver/src/gc_result.rs | 57 + pageserver/src/http/routes.rs | 4 +- pageserver/src/import_datadir.rs | 4 +- pageserver/src/lib.rs | 3 +- pageserver/src/pgdatadir_mapping.rs | 6 +- pageserver/src/tenant.rs | 15 +- pageserver/src/tenant/layer_map.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 +- pageserver/src/tenant/storage_layer.rs | 4 +- .../src/tenant/storage_layer/delta_layer.rs | 13 +- .../tenant/storage_layer/filter_iterator.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 5 +- .../tenant/storage_layer/inmemory_layer.rs | 3 +- .../src/tenant/storage_layer/layer/tests.rs | 4 +- .../src/tenant/storage_layer/layer_desc.rs | 2 +- .../src/tenant/storage_layer/layer_name.rs | 2 +- .../tenant/storage_layer/merge_iterator.rs | 11 +- .../src/tenant/storage_layer/split_writer.rs | 3 +- pageserver/src/tenant/timeline.rs | 21 +- pageserver/src/tenant/timeline/compaction.rs | 9 +- .../walreceiver/walreceiver_connection.rs | 3 +- pageserver/src/walingest.rs | 151 +-- pageserver/src/walredo.rs | 9 +- pageserver/src/walredo/apply_neon.rs | 4 +- pageserver/src/walredo/process.rs | 2 +- 45 files changed, 985 insertions(+), 917 deletions(-) create mode 100644 libs/pageserver_api/src/record.rs rename pageserver/src/repository.rs => libs/pageserver_api/src/value.rs (73%) rename pageserver/src/walrecord.rs => libs/postgres_ffi/src/record.rs (88%) create mode 100644 libs/wal_decoder/Cargo.toml create mode 100644 libs/wal_decoder/src/decoder.rs create mode 100644 libs/wal_decoder/src/lib.rs create mode 100644 libs/wal_decoder/src/models.rs create mode 100644 pageserver/src/gc_result.rs diff --git a/Cargo.lock b/Cargo.lock index ad29fa4634..08b9d610ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3749,6 +3749,7 @@ dependencies = [ "tracing", "url", "utils", + "wal_decoder", "walkdir", "workspace_hack", ] @@ -4186,6 +4187,7 @@ dependencies = [ "regex", "serde", "thiserror", + "tracing", "utils", ] @@ -6954,6 +6956,19 @@ dependencies = [ "utils", ] +[[package]] +name = "wal_decoder" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "pageserver_api", + "postgres_ffi", + "serde", + "tracing", + "utils", +] + [[package]] name = "walkdir" version = "2.3.3" diff --git a/Cargo.toml b/Cargo.toml index 4c6a24ecde..7f9a766ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "libs/postgres_ffi/wal_craft", "libs/vm_monitor", "libs/walproposer", + "libs/wal_decoder", ] [workspace.package] @@ -238,6 +239,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } walproposer = { version = "0.1", path = "./libs/walproposer/" } +wal_decoder = { version = "0.1", path = "./libs/wal_decoder" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 532185a366..ff705e79cd 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -5,9 +5,11 @@ pub mod controller_api; pub mod key; pub mod keyspace; pub mod models; +pub mod record; pub mod reltag; pub mod shard; /// Public API types pub mod upcall_api; +pub mod value; pub mod config; diff --git a/libs/pageserver_api/src/record.rs b/libs/pageserver_api/src/record.rs new file mode 100644 index 0000000000..03817a8d20 --- /dev/null +++ b/libs/pageserver_api/src/record.rs @@ -0,0 +1,113 @@ +//! This module defines the WAL record format used within the pageserver. + +use bytes::Bytes; +use postgres_ffi::record::{describe_postgres_wal_record, MultiXactMember}; +use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId}; +use serde::{Deserialize, Serialize}; +use utils::bin_ser::DeserializeError; + +/// Each update to a page is represented by a NeonWalRecord. It can be a wrapper +/// around a PostgreSQL WAL record, or a custom neon-specific "record". +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum NeonWalRecord { + /// Native PostgreSQL WAL record + Postgres { will_init: bool, rec: Bytes }, + + /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear) + ClearVisibilityMapFlags { + new_heap_blkno: Option, + old_heap_blkno: Option, + flags: u8, + }, + /// Mark transaction IDs as committed on a CLOG page + ClogSetCommitted { + xids: Vec, + timestamp: TimestampTz, + }, + /// Mark transaction IDs as aborted on a CLOG page + ClogSetAborted { xids: Vec }, + /// Extend multixact offsets SLRU + MultixactOffsetCreate { + mid: MultiXactId, + moff: MultiXactOffset, + }, + /// Extend multixact members SLRU. + MultixactMembersCreate { + moff: MultiXactOffset, + members: Vec, + }, + /// Update the map of AUX files, either writing or dropping an entry + AuxFile { + file_path: String, + content: Option, + }, + + /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. + #[cfg(feature = "testing")] + Test { + /// Append a string to the image. + append: String, + /// Clear the image before appending. + clear: bool, + /// Treat this record as an init record. `clear` should be set to true if this field is set + /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and + /// its references in `timeline.rs`. + will_init: bool, + }, +} + +impl NeonWalRecord { + /// Does replaying this WAL record initialize the page from scratch, or does + /// it need to be applied over the previous image of the page? + pub fn will_init(&self) -> bool { + // If you change this function, you'll also need to change ValueBytes::will_init + match self { + NeonWalRecord::Postgres { will_init, rec: _ } => *will_init, + #[cfg(feature = "testing")] + NeonWalRecord::Test { will_init, .. } => *will_init, + // None of the special neon record types currently initialize the page + _ => false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_append(s: impl AsRef) -> Self { + Self::Test { + append: s.as_ref().to_string(), + clear: false, + will_init: false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_clear() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: false, + } + } + + #[cfg(feature = "testing")] + pub fn wal_init() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: true, + } + } +} + +/// Build a human-readable string to describe a WAL record +/// +/// For debugging purposes +pub fn describe_wal_record(rec: &NeonWalRecord) -> Result { + match rec { + NeonWalRecord::Postgres { will_init, rec } => Ok(format!( + "will_init: {}, {}", + will_init, + describe_postgres_wal_record(rec)? + )), + _ => Ok(format!("{:?}", rec)), + } +} diff --git a/pageserver/src/repository.rs b/libs/pageserver_api/src/value.rs similarity index 73% rename from pageserver/src/repository.rs rename to libs/pageserver_api/src/value.rs index e4ebafd927..583c0b2a78 100644 --- a/pageserver/src/repository.rs +++ b/libs/pageserver_api/src/value.rs @@ -1,13 +1,9 @@ -use crate::walrecord::NeonWalRecord; -use anyhow::Result; +//! This module defines the value type used by the storage engine. + +use crate::record::NeonWalRecord; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::ops::AddAssign; -use std::time::Duration; -pub use pageserver_api::key::{Key, KEY_SIZE}; - -/// A 'value' stored for a one Key. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum Value { /// An Image value contains a full copy of the value @@ -33,17 +29,17 @@ impl Value { } #[derive(Debug, PartialEq)] -pub(crate) enum InvalidInput { +pub enum InvalidInput { TooShortValue, TooShortPostgresRecord, } /// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets /// use this type for querying if a slice looks some particular way. -pub(crate) struct ValueBytes; +pub struct ValueBytes; impl ValueBytes { - pub(crate) fn will_init(raw: &[u8]) -> Result { + pub fn will_init(raw: &[u8]) -> Result { if raw.len() < 12 { return Err(InvalidInput::TooShortValue); } @@ -79,6 +75,7 @@ impl ValueBytes { mod test { use super::*; + use bytes::Bytes; use utils::bin_ser::BeSer; macro_rules! roundtrip { @@ -229,56 +226,3 @@ mod test { assert!(!ValueBytes::will_init(&expected).unwrap()); } } - -/// -/// Result of performing GC -/// -#[derive(Default, Serialize, Debug)] -pub struct GcResult { - pub layers_total: u64, - pub layers_needed_by_cutoff: u64, - pub layers_needed_by_pitr: u64, - pub layers_needed_by_branches: u64, - pub layers_needed_by_leases: u64, - pub layers_not_updated: u64, - pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. - - #[serde(serialize_with = "serialize_duration_as_millis")] - pub elapsed: Duration, - - /// The layers which were garbage collected. - /// - /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be - /// dropped in tests. - #[cfg(feature = "testing")] - #[serde(skip)] - pub(crate) doomed_layers: Vec, -} - -// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds -fn serialize_duration_as_millis(d: &Duration, serializer: S) -> Result -where - S: serde::Serializer, -{ - d.as_millis().serialize(serializer) -} - -impl AddAssign for GcResult { - fn add_assign(&mut self, other: Self) { - self.layers_total += other.layers_total; - self.layers_needed_by_pitr += other.layers_needed_by_pitr; - self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; - self.layers_needed_by_branches += other.layers_needed_by_branches; - self.layers_needed_by_leases += other.layers_needed_by_leases; - self.layers_not_updated += other.layers_not_updated; - self.layers_removed += other.layers_removed; - - self.elapsed += other.elapsed; - - #[cfg(feature = "testing")] - { - let mut other = other; - self.doomed_layers.append(&mut other.doomed_layers); - } - } -} diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index ef17833a48..e1f5443cbe 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -15,6 +15,7 @@ memoffset.workspace = true thiserror.workspace = true serde.workspace = true utils.workspace = true +tracing.workspace = true [dev-dependencies] env_logger.workspace = true diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 0d46ed6aac..2401363322 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -216,6 +216,7 @@ macro_rules! enum_pgversion { } pub mod pg_constants; +pub mod record; pub mod relfile_utils; // Export some widely used datatypes that are unlikely to change across Postgres versions diff --git a/pageserver/src/walrecord.rs b/libs/postgres_ffi/src/record.rs similarity index 88% rename from pageserver/src/walrecord.rs rename to libs/postgres_ffi/src/record.rs index dd199e2c55..36cfcf658b 100644 --- a/pageserver/src/walrecord.rs +++ b/libs/postgres_ffi/src/record.rs @@ -1,107 +1,263 @@ +//! This module houses types used in decoding of PG WAL +//! records. //! -//! Functions for parsing WAL records. -//! +//! TODO: Generate separate types for each supported PG version -use anyhow::Result; +use crate::pg_constants; +use crate::XLogRecord; +use crate::{ + BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz, + TransactionId, +}; +use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD}; use bytes::{Buf, Bytes}; -use postgres_ffi::dispatch_pgversion; -use postgres_ffi::pg_constants; -use postgres_ffi::BLCKSZ; -use postgres_ffi::{BlockNumber, TimestampTz}; -use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; -use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; use serde::{Deserialize, Serialize}; -use tracing::*; -use utils::{bin_ser::DeserializeError, lsn::Lsn}; +use utils::bin_ser::DeserializeError; +use utils::lsn::Lsn; -/// Each update to a page is represented by a NeonWalRecord. It can be a wrapper -/// around a PostgreSQL WAL record, or a custom neon-specific "record". -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum NeonWalRecord { - /// Native PostgreSQL WAL record - Postgres { will_init: bool, rec: Bytes }, +/// +/// Note: Parsing some fields is missing, because they're not needed. +/// +/// This is similar to the xl_xact_parsed_commit and +/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same +/// struct for commits and aborts. +/// +#[derive(Debug)] +pub struct XlXactParsedRecord { + pub xid: TransactionId, + pub info: u8, + pub xact_time: TimestampTz, + pub xinfo: u32, - /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear) - ClearVisibilityMapFlags { - new_heap_blkno: Option, - old_heap_blkno: Option, - flags: u8, - }, - /// Mark transaction IDs as committed on a CLOG page - ClogSetCommitted { - xids: Vec, - timestamp: TimestampTz, - }, - /// Mark transaction IDs as aborted on a CLOG page - ClogSetAborted { xids: Vec }, - /// Extend multixact offsets SLRU - MultixactOffsetCreate { - mid: MultiXactId, - moff: MultiXactOffset, - }, - /// Extend multixact members SLRU. - MultixactMembersCreate { - moff: MultiXactOffset, - members: Vec, - }, - /// Update the map of AUX files, either writing or dropping an entry - AuxFile { - file_path: String, - content: Option, - }, + pub db_id: Oid, + /* MyDatabaseId */ + pub ts_id: Oid, + /* MyDatabaseTableSpace */ + pub subxacts: Vec, - /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. - #[cfg(test)] - Test { - /// Append a string to the image. - append: String, - /// Clear the image before appending. - clear: bool, - /// Treat this record as an init record. `clear` should be set to true if this field is set - /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and - /// its references in `timeline.rs`. - will_init: bool, - }, + pub xnodes: Vec, + pub origin_lsn: Lsn, } -impl NeonWalRecord { - /// Does replaying this WAL record initialize the page from scratch, or does - /// it need to be applied over the previous image of the page? - pub fn will_init(&self) -> bool { - // If you change this function, you'll also need to change ValueBytes::will_init - match self { - NeonWalRecord::Postgres { will_init, rec: _ } => *will_init, - #[cfg(test)] - NeonWalRecord::Test { will_init, .. } => *will_init, - // None of the special neon record types currently initialize the page - _ => false, +impl XlXactParsedRecord { + /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED + /// record. This should agree with the ParseCommitRecord and ParseAbortRecord + /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) + pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + // The record starts with time of commit/abort + let xact_time = buf.get_i64_le(); + let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + buf.get_u32_le() + } else { + 0 + }; + let db_id; + let ts_id; + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + db_id = buf.get_u32_le(); + ts_id = buf.get_u32_le(); + } else { + db_id = 0; + ts_id = 0; + } + let mut subxacts = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + subxacts.push(subxact); + } + } + let mut xnodes = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + tracing::trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + xnodes.push(RelFileNode { + spcnode, + dbnode, + relnode, + }); + } + } + + if xinfo & crate::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 { + let nitems = buf.get_i32_le(); + tracing::debug!( + "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}", + nitems + ); + let sizeof_xl_xact_stats_item = 12; + buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap()); + } + + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + let sizeof_shared_invalidation_message = 16; + buf.advance( + (nmsgs * sizeof_shared_invalidation_message) + .try_into() + .unwrap(), + ); + } + + if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { + xid = buf.get_u32_le(); + tracing::debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); + } + + let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 { + Lsn(buf.get_u64_le()) + } else { + Lsn::INVALID + }; + XlXactParsedRecord { + xid, + info, + xact_time, + xinfo, + db_id, + ts_id, + subxacts, + xnodes, + origin_lsn, } } +} - #[cfg(test)] - pub(crate) fn wal_append(s: impl AsRef) -> Self { - Self::Test { - append: s.as_ref().to_string(), - clear: false, - will_init: false, +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactCreate { + pub mid: MultiXactId, + /* new MultiXact's ID */ + pub moff: MultiXactOffset, + /* its starting offset in members file */ + pub nmembers: u32, + /* number of member XIDs */ + pub members: Vec, +} + +impl XlMultiXactCreate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { + let mid = buf.get_u32_le(); + let moff = buf.get_u32_le(); + let nmembers = buf.get_u32_le(); + let mut members = Vec::new(); + for _ in 0..nmembers { + members.push(MultiXactMember::decode(buf)); + } + XlMultiXactCreate { + mid, + moff, + nmembers, + members, } } +} - #[cfg(test)] - pub(crate) fn wal_clear() -> Self { - Self::Test { - append: "".to_string(), - clear: true, - will_init: false, +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactTruncate { + pub oldest_multi_db: Oid, + /* to-be-truncated range of multixact offsets */ + pub start_trunc_off: MultiXactId, + /* just for completeness' sake */ + pub end_trunc_off: MultiXactId, + + /* to-be-truncated range of multixact members */ + pub start_trunc_memb: MultiXactOffset, + pub end_trunc_memb: MultiXactOffset, +} + +impl XlMultiXactTruncate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { + XlMultiXactTruncate { + oldest_multi_db: buf.get_u32_le(), + start_trunc_off: buf.get_u32_le(), + end_trunc_off: buf.get_u32_le(), + start_trunc_memb: buf.get_u32_le(), + end_trunc_memb: buf.get_u32_le(), } } +} - #[cfg(test)] - pub(crate) fn wal_init() -> Self { - Self::Test { - append: "".to_string(), - clear: true, - will_init: true, +#[repr(C)] +#[derive(Debug)] +pub struct XlRelmapUpdate { + pub dbid: Oid, /* database ID, or 0 for shared map */ + pub tsid: Oid, /* database's tablespace, or pg_global */ + pub nbytes: i32, /* size of relmap data */ +} + +impl XlRelmapUpdate { + pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { + XlRelmapUpdate { + dbid: buf.get_u32_le(), + tsid: buf.get_u32_le(), + nbytes: buf.get_i32_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginDrop { + pub node_id: RepOriginId, +} + +impl XlReploriginDrop { + pub fn decode(buf: &mut Bytes) -> XlReploriginDrop { + XlReploriginDrop { + node_id: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginSet { + pub remote_lsn: Lsn, + pub node_id: RepOriginId, +} + +impl XlReploriginSet { + pub fn decode(buf: &mut Bytes) -> XlReploriginSet { + XlReploriginSet { + remote_lsn: Lsn(buf.get_u64_le()), + node_id: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct RelFileNode { + pub spcnode: Oid, /* tablespace */ + pub dbnode: Oid, /* database */ + pub relnode: Oid, /* relation */ +} + +#[repr(C)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MultiXactMember { + pub xid: TransactionId, + pub status: MultiXactStatus, +} + +impl MultiXactMember { + pub fn decode(buf: &mut Bytes) -> MultiXactMember { + MultiXactMember { + xid: buf.get_u32_le(), + status: buf.get_u32_le(), } } } @@ -164,17 +320,17 @@ impl DecodedWALRecord { /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations /// by reading other existing relations' data blocks. This is more complex to apply than new-style database /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case. - pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool { + pub fn is_dbase_create_copy(&self, pg_version: u32) -> bool { if self.xl_rmid == pg_constants::RM_DBASE_ID { let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK; match pg_version { 14 => { // Postgres 14 database creations are always the legacy kind - info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE + info == crate::v14::bindings::XLOG_DBASE_CREATE } - 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY, - 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY, - 17 => info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY, + 17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY, _ => { panic!("Unsupported postgres version {pg_version}") } @@ -185,35 +341,294 @@ impl DecodedWALRecord { } } -#[repr(C)] -#[derive(Debug, Clone, Copy)] -pub struct RelFileNode { - pub spcnode: Oid, /* tablespace */ - pub dbnode: Oid, /* database */ - pub relnode: Oid, /* relation */ -} +/// Main routine to decode a WAL record and figure out which blocks are modified +// +// See xlogrecord.h for details +// The overall layout of an XLOG record is: +// Fixed-size header (XLogRecord struct) +// XLogRecordBlockHeader struct +// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows +// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an +// XLogRecordBlockCompressHeader struct follows. +// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows +// BlockNumber follows +// XLogRecordBlockHeader struct +// ... +// XLogRecordDataHeader[Short|Long] struct +// block data +// block data +// ... +// main data +// +// +// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in. +// It would be more natural for this function to return a DecodedWALRecord as return value, +// but reusing the caller-supplied struct avoids an allocation. +// This code is in the hot path for digesting incoming WAL, and is very performance sensitive. +// +pub fn decode_wal_record( + record: Bytes, + decoded: &mut DecodedWALRecord, + pg_version: u32, +) -> anyhow::Result<()> { + let mut rnode_spcnode: u32 = 0; + let mut rnode_dbnode: u32 = 0; + let mut rnode_relnode: u32 = 0; + let mut got_rnode = false; + let mut origin_id: u16 = 0; -#[repr(C)] -#[derive(Debug)] -pub struct XlRelmapUpdate { - pub dbid: Oid, /* database ID, or 0 for shared map */ - pub tsid: Oid, /* database's tablespace, or pg_global */ - pub nbytes: i32, /* size of relmap data */ -} + let mut buf = record.clone(); -impl XlRelmapUpdate { - pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { - XlRelmapUpdate { - dbid: buf.get_u32_le(), - tsid: buf.get_u32_le(), - nbytes: buf.get_i32_le(), + // 1. Parse XLogRecord struct + + // FIXME: assume little-endian here + let xlogrec = XLogRecord::from_bytes(&mut buf)?; + + tracing::trace!( + "decode_wal_record xl_rmid = {} xl_info = {}", + xlogrec.xl_rmid, + xlogrec.xl_info + ); + + let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; + + if buf.remaining() != remaining { + //TODO error + } + + let mut max_block_id = 0; + let mut blocks_total_len: u32 = 0; + let mut main_data_len = 0; + let mut datatotal: u32 = 0; + decoded.blocks.clear(); + + // 2. Decode the headers. + // XLogRecordBlockHeaders if any, + // XLogRecordDataHeader[Short|Long] + while buf.remaining() > datatotal as usize { + let block_id = buf.get_u8(); + + match block_id { + pg_constants::XLR_BLOCK_ID_DATA_SHORT => { + /* XLogRecordDataHeaderShort */ + main_data_len = buf.get_u8() as u32; + datatotal += main_data_len; + } + + pg_constants::XLR_BLOCK_ID_DATA_LONG => { + /* XLogRecordDataHeaderLong */ + main_data_len = buf.get_u32_le(); + datatotal += main_data_len; + } + + pg_constants::XLR_BLOCK_ID_ORIGIN => { + // RepOriginId is uint16 + origin_id = buf.get_u16_le(); + } + + pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { + // TransactionId is uint32 + buf.advance(4); + } + + 0..=pg_constants::XLR_MAX_BLOCK_ID => { + /* XLogRecordBlockHeader */ + let mut blk = DecodedBkpBlock::new(); + + if block_id <= max_block_id { + // TODO + //report_invalid_record(state, + // "out-of-order block_id %u at %X/%X", + // block_id, + // (uint32) (state->ReadRecPtr >> 32), + // (uint32) state->ReadRecPtr); + // goto err; + } + max_block_id = block_id; + + let fork_flags: u8 = buf.get_u8(); + blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; + blk.flags = fork_flags; + blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; + blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; + blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; + blk.data_len = buf.get_u16_le(); + + /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ + + datatotal += blk.data_len as u32; + blocks_total_len += blk.data_len as u32; + + if blk.has_image { + blk.bimg_len = buf.get_u16_le(); + blk.hole_offset = buf.get_u16_le(); + blk.bimg_info = buf.get_u8(); + + blk.apply_image = dispatch_pgversion!( + pg_version, + (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0 + ); + + let blk_img_is_compressed = + crate::bkpimage_is_compressed(blk.bimg_info, pg_version); + + if blk_img_is_compressed { + tracing::debug!("compressed block image , pg_version = {}", pg_version); + } + + if blk_img_is_compressed { + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { + blk.hole_length = buf.get_u16_le(); + } else { + blk.hole_length = 0; + } + } else { + blk.hole_length = BLCKSZ - blk.bimg_len; + } + datatotal += blk.bimg_len as u32; + blocks_total_len += blk.bimg_len as u32; + + /* + * cross-check that hole_offset > 0, hole_length > 0 and + * bimg_len < BLCKSZ if the HAS_HOLE flag is set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 + && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) + { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", + (unsigned int) blk->hole_offset, + (unsigned int) blk->hole_length, + (unsigned int) blk->bimg_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that hole_offset == 0 and hole_length == 0 if + * the HAS_HOLE flag is not set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 + && (blk.hole_offset != 0 || blk.hole_length != 0) + { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", + (unsigned int) blk->hole_offset, + (unsigned int) blk->hole_length, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED + * flag is set. + */ + if !blk_img_is_compressed && blk.bimg_len == BLCKSZ { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", + (unsigned int) blk->bimg_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + + /* + * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor + * IS_COMPRESSED flag is set. + */ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 + && !blk_img_is_compressed + && blk.bimg_len != BLCKSZ + { + // TODO + /* + report_invalid_record(state, + "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", + (unsigned int) blk->data_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + */ + } + } + if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { + rnode_spcnode = buf.get_u32_le(); + rnode_dbnode = buf.get_u32_le(); + rnode_relnode = buf.get_u32_le(); + got_rnode = true; + } else if !got_rnode { + // TODO + /* + report_invalid_record(state, + "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; */ + } + + blk.rnode_spcnode = rnode_spcnode; + blk.rnode_dbnode = rnode_dbnode; + blk.rnode_relnode = rnode_relnode; + + blk.blkno = buf.get_u32_le(); + tracing::trace!( + "this record affects {}/{}/{} blk {}", + rnode_spcnode, + rnode_dbnode, + rnode_relnode, + blk.blkno + ); + + decoded.blocks.push(blk); + } + + _ => { + // TODO: invalid block_id + } } } + + // 3. Decode blocks. + let mut ptr = record.len() - buf.remaining(); + for blk in decoded.blocks.iter_mut() { + if blk.has_image { + blk.bimg_offset = ptr as u32; + ptr += blk.bimg_len as usize; + } + if blk.has_data { + ptr += blk.data_len as usize; + } + } + // We don't need them, so just skip blocks_total_len bytes + buf.advance(blocks_total_len as usize); + assert_eq!(ptr, record.len() - buf.remaining()); + + let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; + + // 4. Decode main_data + if main_data_len > 0 { + assert_eq!(buf.remaining(), main_data_len as usize); + } + + decoded.xl_xid = xlogrec.xl_xid; + decoded.xl_info = xlogrec.xl_info; + decoded.xl_rmid = xlogrec.xl_rmid; + decoded.record = record; + decoded.origin_id = origin_id; + decoded.main_data_offset = main_data_offset; + + Ok(()) } pub mod v14 { + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; #[repr(C)] #[derive(Debug)] @@ -383,8 +798,8 @@ pub mod v15 { pub mod v16 { pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange}; + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; pub struct XlHeapDelete { pub xmax: TransactionId, @@ -450,8 +865,8 @@ pub mod v16 { /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */ pub mod rm_neon { + use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; - use postgres_ffi::{OffsetNumber, TransactionId}; #[repr(C)] #[derive(Debug)] @@ -563,8 +978,8 @@ pub mod v16 { pub mod v17 { pub use super::v14::XlHeapLockUpdated; + pub use crate::{TimeLineID, TimestampTz}; use bytes::{Buf, Bytes}; - pub use postgres_ffi::{TimeLineID, TimestampTz}; pub use super::v16::rm_neon; pub use super::v16::{ @@ -681,125 +1096,6 @@ impl XlDropDatabase { } } -/// -/// Note: Parsing some fields is missing, because they're not needed. -/// -/// This is similar to the xl_xact_parsed_commit and -/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same -/// struct for commits and aborts. -/// -#[derive(Debug)] -pub struct XlXactParsedRecord { - pub xid: TransactionId, - pub info: u8, - pub xact_time: TimestampTz, - pub xinfo: u32, - - pub db_id: Oid, - /* MyDatabaseId */ - pub ts_id: Oid, - /* MyDatabaseTableSpace */ - pub subxacts: Vec, - - pub xnodes: Vec, - pub origin_lsn: Lsn, -} - -impl XlXactParsedRecord { - /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED - /// record. This should agree with the ParseCommitRecord and ParseAbortRecord - /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) - pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; - // The record starts with time of commit/abort - let xact_time = buf.get_i64_le(); - let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - buf.get_u32_le() - } else { - 0 - }; - let db_id; - let ts_id; - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - db_id = buf.get_u32_le(); - ts_id = buf.get_u32_le(); - } else { - db_id = 0; - ts_id = 0; - } - let mut subxacts = Vec::::new(); - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - subxacts.push(subxact); - } - } - let mut xnodes = Vec::::new(); - if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { - let nrels = buf.get_i32_le(); - for _i in 0..nrels { - let spcnode = buf.get_u32_le(); - let dbnode = buf.get_u32_le(); - let relnode = buf.get_u32_le(); - trace!( - "XLOG_XACT_COMMIT relfilenode {}/{}/{}", - spcnode, - dbnode, - relnode - ); - xnodes.push(RelFileNode { - spcnode, - dbnode, - relnode, - }); - } - } - - if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 { - let nitems = buf.get_i32_le(); - debug!( - "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}", - nitems - ); - let sizeof_xl_xact_stats_item = 12; - buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap()); - } - - if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { - let nmsgs = buf.get_i32_le(); - let sizeof_shared_invalidation_message = 16; - buf.advance( - (nmsgs * sizeof_shared_invalidation_message) - .try_into() - .unwrap(), - ); - } - - if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - xid = buf.get_u32_le(); - debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); - } - - let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 { - Lsn(buf.get_u64_le()) - } else { - Lsn::INVALID - }; - XlXactParsedRecord { - xid, - info, - xact_time, - xinfo, - db_id, - ts_id, - subxacts, - xnodes, - origin_lsn, - } - } -} - #[repr(C)] #[derive(Debug)] pub struct XlClogTruncate { @@ -822,78 +1118,6 @@ impl XlClogTruncate { } } -#[repr(C)] -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MultiXactMember { - pub xid: TransactionId, - pub status: MultiXactStatus, -} - -impl MultiXactMember { - pub fn decode(buf: &mut Bytes) -> MultiXactMember { - MultiXactMember { - xid: buf.get_u32_le(), - status: buf.get_u32_le(), - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactCreate { - pub mid: MultiXactId, - /* new MultiXact's ID */ - pub moff: MultiXactOffset, - /* its starting offset in members file */ - pub nmembers: u32, - /* number of member XIDs */ - pub members: Vec, -} - -impl XlMultiXactCreate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { - let mid = buf.get_u32_le(); - let moff = buf.get_u32_le(); - let nmembers = buf.get_u32_le(); - let mut members = Vec::new(); - for _ in 0..nmembers { - members.push(MultiXactMember::decode(buf)); - } - XlMultiXactCreate { - mid, - moff, - nmembers, - members, - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactTruncate { - pub oldest_multi_db: Oid, - /* to-be-truncated range of multixact offsets */ - pub start_trunc_off: MultiXactId, - /* just for completeness' sake */ - pub end_trunc_off: MultiXactId, - - /* to-be-truncated range of multixact members */ - pub start_trunc_memb: MultiXactOffset, - pub end_trunc_memb: MultiXactOffset, -} - -impl XlMultiXactTruncate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { - XlMultiXactTruncate { - oldest_multi_db: buf.get_u32_le(), - start_trunc_off: buf.get_u32_le(), - end_trunc_off: buf.get_u32_le(), - start_trunc_memb: buf.get_u32_le(), - end_trunc_memb: buf.get_u32_le(), - } - } -} - #[repr(C)] #[derive(Debug)] pub struct XlLogicalMessage { @@ -950,337 +1174,7 @@ impl XlRunningXacts { } } -#[repr(C)] -#[derive(Debug)] -pub struct XlReploriginDrop { - pub node_id: RepOriginId, -} - -impl XlReploriginDrop { - pub fn decode(buf: &mut Bytes) -> XlReploriginDrop { - XlReploriginDrop { - node_id: buf.get_u16_le(), - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlReploriginSet { - pub remote_lsn: Lsn, - pub node_id: RepOriginId, -} - -impl XlReploriginSet { - pub fn decode(buf: &mut Bytes) -> XlReploriginSet { - XlReploriginSet { - remote_lsn: Lsn(buf.get_u64_le()), - node_id: buf.get_u16_le(), - } - } -} - -/// Main routine to decode a WAL record and figure out which blocks are modified -// -// See xlogrecord.h for details -// The overall layout of an XLOG record is: -// Fixed-size header (XLogRecord struct) -// XLogRecordBlockHeader struct -// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows -// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an -// XLogRecordBlockCompressHeader struct follows. -// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows -// BlockNumber follows -// XLogRecordBlockHeader struct -// ... -// XLogRecordDataHeader[Short|Long] struct -// block data -// block data -// ... -// main data -// -// -// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in. -// It would be more natural for this function to return a DecodedWALRecord as return value, -// but reusing the caller-supplied struct avoids an allocation. -// This code is in the hot path for digesting incoming WAL, and is very performance sensitive. -// -pub fn decode_wal_record( - record: Bytes, - decoded: &mut DecodedWALRecord, - pg_version: u32, -) -> Result<()> { - let mut rnode_spcnode: u32 = 0; - let mut rnode_dbnode: u32 = 0; - let mut rnode_relnode: u32 = 0; - let mut got_rnode = false; - let mut origin_id: u16 = 0; - - let mut buf = record.clone(); - - // 1. Parse XLogRecord struct - - // FIXME: assume little-endian here - let xlogrec = XLogRecord::from_bytes(&mut buf)?; - - trace!( - "decode_wal_record xl_rmid = {} xl_info = {}", - xlogrec.xl_rmid, - xlogrec.xl_info - ); - - let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; - - if buf.remaining() != remaining { - //TODO error - } - - let mut max_block_id = 0; - let mut blocks_total_len: u32 = 0; - let mut main_data_len = 0; - let mut datatotal: u32 = 0; - decoded.blocks.clear(); - - // 2. Decode the headers. - // XLogRecordBlockHeaders if any, - // XLogRecordDataHeader[Short|Long] - while buf.remaining() > datatotal as usize { - let block_id = buf.get_u8(); - - match block_id { - pg_constants::XLR_BLOCK_ID_DATA_SHORT => { - /* XLogRecordDataHeaderShort */ - main_data_len = buf.get_u8() as u32; - datatotal += main_data_len; - } - - pg_constants::XLR_BLOCK_ID_DATA_LONG => { - /* XLogRecordDataHeaderLong */ - main_data_len = buf.get_u32_le(); - datatotal += main_data_len; - } - - pg_constants::XLR_BLOCK_ID_ORIGIN => { - // RepOriginId is uint16 - origin_id = buf.get_u16_le(); - } - - pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { - // TransactionId is uint32 - buf.advance(4); - } - - 0..=pg_constants::XLR_MAX_BLOCK_ID => { - /* XLogRecordBlockHeader */ - let mut blk = DecodedBkpBlock::new(); - - if block_id <= max_block_id { - // TODO - //report_invalid_record(state, - // "out-of-order block_id %u at %X/%X", - // block_id, - // (uint32) (state->ReadRecPtr >> 32), - // (uint32) state->ReadRecPtr); - // goto err; - } - max_block_id = block_id; - - let fork_flags: u8 = buf.get_u8(); - blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; - blk.flags = fork_flags; - blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; - blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; - blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; - blk.data_len = buf.get_u16_le(); - - /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ - - datatotal += blk.data_len as u32; - blocks_total_len += blk.data_len as u32; - - if blk.has_image { - blk.bimg_len = buf.get_u16_le(); - blk.hole_offset = buf.get_u16_le(); - blk.bimg_info = buf.get_u8(); - - blk.apply_image = dispatch_pgversion!( - pg_version, - (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0 - ); - - let blk_img_is_compressed = - postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version); - - if blk_img_is_compressed { - debug!("compressed block image , pg_version = {}", pg_version); - } - - if blk_img_is_compressed { - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { - blk.hole_length = buf.get_u16_le(); - } else { - blk.hole_length = 0; - } - } else { - blk.hole_length = BLCKSZ - blk.bimg_len; - } - datatotal += blk.bimg_len as u32; - blocks_total_len += blk.bimg_len as u32; - - /* - * cross-check that hole_offset > 0, hole_length > 0 and - * bimg_len < BLCKSZ if the HAS_HOLE flag is set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 - && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) - { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", - (unsigned int) blk->hole_offset, - (unsigned int) blk->hole_length, - (unsigned int) blk->bimg_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that hole_offset == 0 and hole_length == 0 if - * the HAS_HOLE flag is not set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 - && (blk.hole_offset != 0 || blk.hole_length != 0) - { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", - (unsigned int) blk->hole_offset, - (unsigned int) blk->hole_length, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED - * flag is set. - */ - if !blk_img_is_compressed && blk.bimg_len == BLCKSZ { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", - (unsigned int) blk->bimg_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - /* - * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor - * IS_COMPRESSED flag is set. - */ - if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 - && !blk_img_is_compressed - && blk.bimg_len != BLCKSZ - { - // TODO - /* - report_invalid_record(state, - "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", - (unsigned int) blk->data_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - } - if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { - rnode_spcnode = buf.get_u32_le(); - rnode_dbnode = buf.get_u32_le(); - rnode_relnode = buf.get_u32_le(); - got_rnode = true; - } else if !got_rnode { - // TODO - /* - report_invalid_record(state, - "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; */ - } - - blk.rnode_spcnode = rnode_spcnode; - blk.rnode_dbnode = rnode_dbnode; - blk.rnode_relnode = rnode_relnode; - - blk.blkno = buf.get_u32_le(); - trace!( - "this record affects {}/{}/{} blk {}", - rnode_spcnode, - rnode_dbnode, - rnode_relnode, - blk.blkno - ); - - decoded.blocks.push(blk); - } - - _ => { - // TODO: invalid block_id - } - } - } - - // 3. Decode blocks. - let mut ptr = record.len() - buf.remaining(); - for blk in decoded.blocks.iter_mut() { - if blk.has_image { - blk.bimg_offset = ptr as u32; - ptr += blk.bimg_len as usize; - } - if blk.has_data { - ptr += blk.data_len as usize; - } - } - // We don't need them, so just skip blocks_total_len bytes - buf.advance(blocks_total_len as usize); - assert_eq!(ptr, record.len() - buf.remaining()); - - let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; - - // 4. Decode main_data - if main_data_len > 0 { - assert_eq!(buf.remaining(), main_data_len as usize); - } - - decoded.xl_xid = xlogrec.xl_xid; - decoded.xl_info = xlogrec.xl_info; - decoded.xl_rmid = xlogrec.xl_rmid; - decoded.record = record; - decoded.origin_id = origin_id; - decoded.main_data_offset = main_data_offset; - - Ok(()) -} - -/// -/// Build a human-readable string to describe a WAL record -/// -/// For debugging purposes -pub fn describe_wal_record(rec: &NeonWalRecord) -> Result { - match rec { - NeonWalRecord::Postgres { will_init, rec } => Ok(format!( - "will_init: {}, {}", - will_init, - describe_postgres_wal_record(rec)? - )), - _ => Ok(format!("{:?}", rec)), - } -} - -fn describe_postgres_wal_record(record: &Bytes) -> Result { +pub fn describe_postgres_wal_record(record: &Bytes) -> Result { // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this. // Maybe use the postgres wal redo process, the same used for replaying WAL records? // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly, diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml new file mode 100644 index 0000000000..1289061aff --- /dev/null +++ b/libs/wal_decoder/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "wal_decoder" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[features] +testing = [] + +[dependencies] +anyhow.workspace = true +bytes.workspace = true +pageserver_api.workspace = true +postgres_ffi.workspace = true +serde.workspace = true +tracing.workspace = true +utils.workspace = true diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/libs/wal_decoder/src/lib.rs b/libs/wal_decoder/src/lib.rs new file mode 100644 index 0000000000..05349d17c9 --- /dev/null +++ b/libs/wal_decoder/src/lib.rs @@ -0,0 +1,2 @@ +pub mod decoder; +pub mod models; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs new file mode 100644 index 0000000000..00a35be274 --- /dev/null +++ b/libs/wal_decoder/src/models.rs @@ -0,0 +1,144 @@ +//! This module houses types which represent decoded PG WAL records +//! ready for the pageserver to interpret. They are higher level +//! than their counterparts in [`postgres_ffi::record`]. + +use bytes::Bytes; +use pageserver_api::reltag::{RelTag, SlruKind}; +use postgres_ffi::record::{ + XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet, XlSmgrTruncate, XlXactParsedRecord +}; +use postgres_ffi::{Oid, TransactionId}; +use utils::lsn::Lsn; + +pub enum HeapamRecord { + ClearVmBits(ClearVmBits), +} + +pub struct ClearVmBits { + pub new_heap_blkno: Option, + pub old_heap_blkno: Option, + pub vm_rel: RelTag, + pub flags: u8, +} + +pub enum NeonrmgrRecord { + ClearVmBits(ClearVmBits), +} + +pub enum SmgrRecord { + Create(SmgrCreate), + Truncate(XlSmgrTruncate), +} + +pub struct SmgrCreate { + pub rel: RelTag, +} + +pub enum DbaseRecord { + Create(DbaseCreate), + Drop(DbaseDrop), +} + +pub struct DbaseCreate { + pub db_id: Oid, + pub tablespace_id: Oid, + pub src_db_id: Oid, + pub src_tablespace_id: Oid, +} + +pub struct DbaseDrop { + pub db_id: Oid, + pub tablespace_ids: Vec, +} + +pub enum ClogRecord { + ZeroPage(ClogZeroPage), + Truncate(ClogTruncate), +} + +pub struct ClogZeroPage { + pub segno: u32, + pub rpageno: u32, +} + +pub struct ClogTruncate { + pub pageno: u32, + pub oldest_xid: TransactionId, + pub oldest_xid_db: Oid, +} + +pub enum XactRecord { + Commit(XactCommon), + Abort(XactCommon), + CommitPrepared(XactCommon), + AbortPrepared(XactCommon), + Prepare(XactPrepare), +} + +pub struct XactCommon { + pub parsed: XlXactParsedRecord, + pub origin_id: u16, + // Fields below are only used for logging + pub xl_xid: TransactionId, + pub lsn: Lsn, +} + +pub struct XactPrepare { + pub xl_xid: TransactionId, + pub data: Bytes, +} + +pub enum MultiXactRecord { + ZeroPage(MultiXactZeroPage), + Create(XlMultiXactCreate), + Truncate(XlMultiXactTruncate), +} + +pub struct MultiXactZeroPage { + pub slru_kind: SlruKind, + pub segno: u32, + pub rpageno: u32, +} + +pub enum RelmapRecord { + Update(RelmapUpdate), +} + +pub struct RelmapUpdate { + pub update: XlRelmapUpdate, + pub buf: Bytes, +} + +pub enum XlogRecord { + Raw(RawXlogRecord), +} + +pub struct RawXlogRecord { + pub info: u8, + pub lsn: Lsn, + pub buf: Bytes, +} + +pub enum LogicalMessageRecord { + Put(PutLogicalMessage), + #[cfg(feature = "testing")] + Failpoint, +} + +pub struct PutLogicalMessage { + pub path: String, + pub buf: Bytes, +} + +pub enum StandbyRecord { + RunningXacts(StandbyRunningXacts), +} + +pub struct StandbyRunningXacts { + pub oldest_running_xid: TransactionId, +} + +pub enum ReploriginRecord { + Set(XlReploriginSet), + Drop(XlReploriginDrop), +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 2531abc7a1..54e3a8a479 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true default = [] # Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, # which adds some runtime cost to run tests on outage conditions -testing = ["fail/failpoints", "pageserver_api/testing" ] +testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing"] [dependencies] anyhow.workspace = true @@ -83,6 +83,7 @@ enum-map.workspace = true enumset = { workspace = true, features = ["serde"]} strum.workspace = true strum_macros.workspace = true +wal_decoder.workspace = true [target.'cfg(target_os = "linux")'.dependencies] procfs.workspace = true @@ -92,6 +93,7 @@ criterion.workspace = true hex-literal.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] } indoc.workspace = true +# pageserver_api = { workspace = true, features = ["testing"] } [[bench]] name = "bench_layer_map" diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index d98b23acce..cd24f22ec8 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -6,15 +6,15 @@ use criterion::{criterion_group, criterion_main, Criterion}; use pageserver::{ config::PageServerConf, context::{DownloadBehavior, RequestContext}, + gc_result::Value, l0_flush::{L0FlushConfig, L0FlushGlobalState}, page_cache, - repository::Value, task_mgr::TaskKind, tenant::storage_layer::inmemory_layer::SerializedBatch, tenant::storage_layer::InMemoryLayer, virtual_file, }; -use pageserver_api::{key::Key, shard::TenantShardId}; +use pageserver_api::{key::Key, shard::TenantShardId, value::Value}; use utils::{ bin_ser::BeSer, id::{TenantId, TimelineId}, diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 1353e79f7c..5c5b52db44 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,9 +1,9 @@ use criterion::measurement::WallTime; use pageserver::keyspace::{KeyPartitioning, KeySpace}; -use pageserver::repository::Key; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::storage_layer::PersistentLayerDesc; +use pageserver_api::key::Key; use pageserver_api::shard::TenantShardId; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use std::cmp::{max, min}; diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 45936cb3fa..d3551b56e1 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -60,7 +60,8 @@ use anyhow::Context; use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; use once_cell::sync::Lazy; -use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; +use pageserver::{config::PageServerConf, walredo::PostgresRedoManager}; +use pageserver_api::record::NeonWalRecord; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ future::Future, diff --git a/pageserver/ctl/src/draw_timeline_dir.rs b/pageserver/ctl/src/draw_timeline_dir.rs index bc939f9688..177e65ef79 100644 --- a/pageserver/ctl/src/draw_timeline_dir.rs +++ b/pageserver/ctl/src/draw_timeline_dir.rs @@ -51,7 +51,7 @@ //! use anyhow::{Context, Result}; -use pageserver::repository::Key; +use pageserver_api::key::Key; use std::cmp::Ordering; use std::io::{self, BufRead}; use std::path::PathBuf; diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 7dd2a5d05c..451d2a1d69 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -14,12 +14,12 @@ use std::ops::Range; use std::{fs, str}; use pageserver::page_cache::{self, PAGE_SZ}; -use pageserver::repository::{Key, KEY_SIZE}; use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; use pageserver::virtual_file::{self, VirtualFile}; +use pageserver_api::key::{Key, KEY_SIZE}; use utils::{bin_ser::BeSer, lsn::Lsn}; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index c0b2b6ae89..2e6a5f9d8a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -12,15 +12,15 @@ use pageserver::tenant::storage_layer::{delta_layer, image_layer}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use pageserver::virtual_file::api::IoMode; -use pageserver::{page_cache, virtual_file}; use pageserver::{ - repository::{Key, KEY_SIZE}, tenant::{ block_io::FileBlockReader, disk_btree::VisitDirection, storage_layer::delta_layer::DELTA_KEY_SIZE, }, virtual_file::VirtualFile, }; +use pageserver_api::key::{Key, KEY_SIZE}; +use pageserver::{page_cache, virtual_file}; use std::fs; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 73bdc90213..7733bdb640 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -696,7 +696,7 @@ impl DeletionQueue { mod test { use camino::Utf8Path; use hex_literal::hex; - use pageserver_api::{shard::ShardIndex, upcall_api::ReAttachResponseTenant}; + use pageserver_api::{key::Key, shard::ShardIndex, upcall_api::ReAttachResponseTenant}; use std::{io::ErrorKind, time::Duration}; use tracing::info; @@ -705,7 +705,6 @@ mod test { use crate::{ controller_upcall_client::RetryForeverError, - repository::Key, tenant::{harness::TenantHarness, storage_layer::DeltaLayerName}, }; diff --git a/pageserver/src/gc_result.rs b/pageserver/src/gc_result.rs new file mode 100644 index 0000000000..c805aafeab --- /dev/null +++ b/pageserver/src/gc_result.rs @@ -0,0 +1,57 @@ +use anyhow::Result; +use serde::Serialize; +use std::ops::AddAssign; +use std::time::Duration; + +/// +/// Result of performing GC +/// +#[derive(Default, Serialize, Debug)] +pub struct GcResult { + pub layers_total: u64, + pub layers_needed_by_cutoff: u64, + pub layers_needed_by_pitr: u64, + pub layers_needed_by_branches: u64, + pub layers_needed_by_leases: u64, + pub layers_not_updated: u64, + pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. + + #[serde(serialize_with = "serialize_duration_as_millis")] + pub elapsed: Duration, + + /// The layers which were garbage collected. + /// + /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be + /// dropped in tests. + #[cfg(feature = "testing")] + #[serde(skip)] + pub(crate) doomed_layers: Vec, +} + +// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds +fn serialize_duration_as_millis(d: &Duration, serializer: S) -> Result +where + S: serde::Serializer, +{ + d.as_millis().serialize(serializer) +} + +impl AddAssign for GcResult { + fn add_assign(&mut self, other: Self) { + self.layers_total += other.layers_total; + self.layers_needed_by_pitr += other.layers_needed_by_pitr; + self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; + self.layers_needed_by_branches += other.layers_needed_by_branches; + self.layers_needed_by_leases += other.layers_needed_by_leases; + self.layers_not_updated += other.layers_not_updated; + self.layers_removed += other.layers_removed; + + self.elapsed += other.elapsed; + + #[cfg(feature = "testing")] + { + let mut other = other; + self.doomed_layers.append(&mut other.doomed_layers); + } + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2490bf5f20..12d842af88 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2129,13 +2129,13 @@ async fn getpage_at_lsn_handler( check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); - struct Key(crate::repository::Key); + struct Key(pageserver_api::key::Key); impl std::str::FromStr for Key { type Err = anyhow::Error; fn from_str(s: &str) -> std::result::Result { - crate::repository::Key::from_hex(s).map(Key) + pageserver_api::key::Key::from_hex(s).map(Key) } } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ca87f1d080..f92a6bd70a 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -19,10 +19,9 @@ use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::*; use crate::tenant::Timeline; use crate::walingest::WalIngest; -use crate::walrecord::decode_wal_record; -use crate::walrecord::DecodedWALRecord; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; +use postgres_ffi::record::{decode_wal_record, DecodedWALRecord}; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::ControlFileData; @@ -456,6 +455,7 @@ pub async fn import_wal_from_tar( if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let mut decoded = DecodedWALRecord::default(); decode_wal_record(recdata, &mut decoded, tline.pg_version)?; + // let (ephemeral_file_ready_buf, special_records) = decode_wal_record(recdata, tline.pg_version); walingest .ingest_record(decoded, lsn, &mut modification, ctx) .await?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index d51931c768..8e78c6124d 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -20,11 +20,11 @@ pub use pageserver_api::keyspace; use tokio_util::sync::CancellationToken; mod assert_u64_eq_usize; pub mod aux_file; +pub mod gc_result; pub mod metrics; pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; -pub mod repository; pub mod span; pub(crate) mod statvfs; pub mod task_mgr; @@ -32,7 +32,6 @@ pub mod tenant; pub mod utilization; pub mod virtual_file; pub mod walingest; -pub mod walrecord; pub mod walredo; use camino::Utf8Path; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f2a11e65c1..60d4ce3ed4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -7,11 +7,10 @@ //! Clarify that) //! use super::tenant::{PageReconstructError, Timeline}; +use crate::aux_file; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; -use crate::walrecord::NeonWalRecord; -use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; @@ -22,7 +21,10 @@ use pageserver_api::key::{ CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::value::Value; +use pageserver_api::key::Key; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d503b299c1..22866e11c2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -84,6 +84,7 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; use crate::deletion_queue::DeletionQueueError; +use crate::gc_result::GcResult; use crate::import_datadir; use crate::is_uninit_mark; use crate::l0_flush::L0FlushGlobalState; @@ -92,7 +93,6 @@ use crate::metrics::{ remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, }; -use crate::repository::GcResult; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::config::LocationMode; @@ -467,10 +467,10 @@ impl WalRedoManager { /// This method is cancellation-safe. pub async fn request_redo( &self, - key: crate::repository::Key, + key: pageserver_api::key::Key, lsn: Lsn, base_img: Option<(Lsn, bytes::Bytes)>, - records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>, + records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, pg_version: u32, ) -> Result { match self { @@ -4528,7 +4528,8 @@ pub(crate) mod harness { use crate::deletion_queue::mock::MockDeletionQueue; use crate::l0_flush::L0FlushConfig; use crate::walredo::apply_neon; - use crate::{repository::Key, walrecord::NeonWalRecord}; + use pageserver_api::key::Key; + use pageserver_api::record::NeonWalRecord; use super::*; use hex_literal::hex; @@ -4798,17 +4799,17 @@ mod tests { use super::*; use crate::keyspace::KeySpaceAccum; - use crate::repository::{Key, Value}; use crate::tenant::harness::*; use crate::tenant::timeline::CompactFlags; - use crate::walrecord::NeonWalRecord; use crate::DEFAULT_PG_VERSION; use bytes::{Bytes, BytesMut}; use hex_literal::hex; use itertools::Itertools; - use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE}; + use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; + use pageserver_api::record::NeonWalRecord; + use pageserver_api::value::Value; use rand::{thread_rng, Rng}; use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 707233b003..7f15baed10 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -48,9 +48,9 @@ mod layer_coverage; use crate::context::RequestContext; use crate::keyspace::KeyPartitioning; -use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use anyhow::Result; +use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceAccum}; use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze}; use std::collections::{HashMap, VecDeque}; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 0567f8f3a7..b5df4c7a26 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2811,7 +2811,7 @@ where } use { - crate::repository::GcResult, pageserver_api::models::TimelineGcRequest, + crate::gc_result::GcResult, pageserver_api::models::TimelineGcRequest, utils::http::error::ApiError, }; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index a229b59560..309edf9d0e 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -11,11 +11,11 @@ pub mod merge_iterator; pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; -use crate::repository::Value; -use crate::walrecord::NeonWalRecord; use bytes::Bytes; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::value::Value; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ceae1d4b1a..c0a41daad0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -30,7 +30,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; -use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{ @@ -46,7 +45,7 @@ use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; -use crate::{walrecord, TEMP_FILE_SUFFIX}; +use crate::TEMP_FILE_SUFFIX; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; @@ -54,9 +53,11 @@ use futures::StreamExt; use itertools::Itertools; use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::DBDIR_KEY; +use pageserver_api::key::{Key, KEY_SIZE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; +use pageserver_api::value::Value; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; @@ -1293,7 +1294,7 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = crate::repository::ValueBytes::will_init(&data) + let will_init = pageserver_api::value::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); @@ -1356,7 +1357,7 @@ impl DeltaLayerInner { format!(" img {} bytes", img.len()) } Value::WalRecord(rec) => { - let wal_desc = walrecord::describe_wal_record(&rec)?; + let wal_desc = pageserver_api::record::describe_wal_record(&rec)?; format!( " rec {} bytes will_init: {} {}", buf.len(), @@ -1600,7 +1601,6 @@ pub(crate) mod test { use rand::RngCore; use super::*; - use crate::repository::Value; use crate::tenant::harness::TIMELINE_ID; use crate::tenant::storage_layer::{Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; @@ -1612,6 +1612,7 @@ pub(crate) mod test { DEFAULT_PG_VERSION, }; use bytes::Bytes; + use pageserver_api::value::Value; /// Construct an index for a fictional delta layer and and then /// traverse in order to plan vectored reads for a query. Finally, @@ -1964,8 +1965,8 @@ pub(crate) mod test { #[tokio::test] async fn copy_delta_prefix_smoke() { - use crate::walrecord::NeonWalRecord; use bytes::Bytes; + use pageserver_api::record::NeonWalRecord; let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke") .await diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index f45dd4b801..ccfcf68e8f 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -7,7 +7,7 @@ use pageserver_api::{ }; use utils::lsn::Lsn; -use crate::repository::Value; +use pageserver_api::value::Value; use super::merge_iterator::MergeIterator; @@ -121,8 +121,8 @@ mod tests { #[tokio::test] async fn filter_keyspace_iterator() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator") .await diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index fa058833d4..be61222e51 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -28,7 +28,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; -use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, FileBlockReader}; use crate::tenant::disk_btree::{ @@ -51,8 +50,10 @@ use hex; use itertools::Itertools; use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::DBDIR_KEY; +use pageserver_api::key::{Key, KEY_SIZE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; +use pageserver_api::value::Value; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; @@ -1109,6 +1110,7 @@ mod test { use itertools::Itertools; use pageserver_api::{ key::Key, + value::Value, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}, }; use utils::{ @@ -1119,7 +1121,6 @@ mod test { use crate::{ context::RequestContext, - repository::Value, tenant::{ config::TenantConf, harness::{TenantHarness, TIMELINE_ID}, diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 7573ddb5cc..df448a0963 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -7,7 +7,6 @@ use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::repository::{Key, Value}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; @@ -16,9 +15,11 @@ use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Context, Result}; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; +use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; +use pageserver_api::value::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, OnceLock}; use std::time::Instant; diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 9de70f14ee..36dcc8d805 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -760,8 +760,8 @@ async fn evict_and_wait_does_not_wait_for_download() { /// Also checks that the same does not happen on a non-evicted layer (regression test). #[tokio::test(start_paused = true)] async fn eviction_cancellation_on_drop() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); @@ -782,7 +782,7 @@ async fn eviction_cancellation_on_drop() { let mut writer = timeline.writer().await; writer .put( - crate::repository::Key::from_i128(5), + pageserver_api::key::Key::from_i128(5), Lsn(0x20), &Value::Image(Bytes::from_static(b"this does not matter either")), &ctx, diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index e90ff3c4b2..de7765c92f 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -3,7 +3,7 @@ use pageserver_api::shard::TenantShardId; use std::ops::Range; use utils::{id::TimelineId, lsn::Lsn}; -use crate::repository::Key; +use pageserver_api::key::Key; use super::{DeltaLayerName, ImageLayerName, LayerName}; diff --git a/pageserver/src/tenant/storage_layer/layer_name.rs b/pageserver/src/tenant/storage_layer/layer_name.rs index 8e750e1187..2b98d74f9f 100644 --- a/pageserver/src/tenant/storage_layer/layer_name.rs +++ b/pageserver/src/tenant/storage_layer/layer_name.rs @@ -1,7 +1,7 @@ //! //! Helper functions for dealing with filenames of the image and delta layer files. //! -use crate::repository::Key; +use pageserver_api::key::Key; use std::borrow::Cow; use std::cmp::Ordering; use std::fmt; diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index f91e27241d..43c2244d3c 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -7,7 +7,8 @@ use anyhow::bail; use pageserver_api::key::Key; use utils::lsn::Lsn; -use crate::{context::RequestContext, repository::Value}; +use crate::context::RequestContext; +use pageserver_api::value::Value; use super::{ delta_layer::{DeltaLayerInner, DeltaLayerIterator}, @@ -293,9 +294,9 @@ mod tests { harness::{TenantHarness, TIMELINE_ID}, storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value}, }, - walrecord::NeonWalRecord, DEFAULT_PG_VERSION, }; + use pageserver_api::record::NeonWalRecord; async fn assert_merge_iter_equal( merge_iter: &mut MergeIterator<'_>, @@ -319,8 +320,8 @@ mod tests { #[tokio::test] async fn merge_in_between() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_merge_in_between") .await @@ -384,8 +385,8 @@ mod tests { #[tokio::test] async fn delta_merge() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_merge") .await @@ -460,8 +461,8 @@ mod tests { #[tokio::test] async fn delta_image_mixed_merge() { - use crate::repository::Value; use bytes::Bytes; + use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge") .await diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index 45ac0c6668..3a6869eb9c 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -5,7 +5,8 @@ use pageserver_api::key::{Key, KEY_SIZE}; use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; use crate::tenant::storage_layer::Layer; -use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; +use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline}; +use pageserver_api::value::Value; use super::layer::S3_UPLOAD_LIMIT; use super::{ diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7b40a24c54..0c901e2a7a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -125,11 +125,12 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; -use crate::repository::GcResult; -use crate::repository::{Key, Value}; +use crate::gc_result::GcResult; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::ZERO_PAGE; +use pageserver_api::key::Key; +use pageserver_api::value::Value; use self::delete::DeleteTimelineFlow; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -5816,17 +5817,15 @@ fn is_send() { #[cfg(test)] mod tests { use pageserver_api::key::Key; + use pageserver_api::value::Value; use utils::{id::TimelineId, lsn::Lsn}; - use crate::{ - repository::Value, - tenant::{ - harness::{test_img, TenantHarness}, - layer_map::LayerMap, - storage_layer::{Layer, LayerName}, - timeline::{DeltaLayerTestDesc, EvictionError}, - Timeline, - }, + use crate::tenant::{ + harness::{test_img, TenantHarness}, + layer_map::LayerMap, + storage_layer::{Layer, LayerName}, + timeline::{DeltaLayerTestDesc, EvictionError}, + Timeline, }; #[tokio::test] diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 37d907ddcb..63f2bbf472 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -49,9 +49,10 @@ use pageserver_api::config::tenant_conf_defaults::{ DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD, }; -use crate::keyspace::KeySpace; -use crate::repository::{Key, Value}; -use crate::walrecord::NeonWalRecord; +use pageserver_api::key::Key; +use pageserver_api::keyspace::KeySpace; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::value::Value; use utils::lsn::Lsn; @@ -2143,7 +2144,7 @@ struct ResidentDeltaLayer(ResidentLayer); struct ResidentImageLayer(ResidentLayer); impl CompactionJobExecutor for TimelineAdaptor { - type Key = crate::repository::Key; + type Key = pageserver_api::key::Key; type Layer = OwnArc; type DeltaLayer = ResidentDeltaLayer; diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index cee259e2e0..dfa8ad2ffa 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -31,10 +31,10 @@ use crate::{ task_mgr::{TaskKind, WALRECEIVER_RUNTIME}, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, - walrecord::{decode_wal_record, DecodedWALRecord}, }; use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; +use postgres_ffi::record::{decode_wal_record, DecodedWALRecord}; use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{id::NodeId, lsn::Lsn}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; @@ -343,6 +343,7 @@ pub(super) async fn handle_walreceiver_connection( let mut decoded = DecodedWALRecord::default(); decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?; + // TODO: Handle this. Probably flush buf + data modifications early. if decoded.is_dbase_create_copy(timeline.pg_version) && uncommitted_records > 0 { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 8a4c0554f8..ee62e00caa 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,8 +28,10 @@ use std::time::Instant; use std::time::SystemTime; use pageserver_api::shard::ShardIdentity; +use postgres_ffi::record::*; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; +use wal_decoder::models::*; use anyhow::{bail, Context, Result}; use bytes::{Buf, Bytes, BytesMut}; @@ -43,9 +45,9 @@ use crate::pgdatadir_mapping::{DatadirModification, Version}; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; -use crate::walrecord::*; use crate::ZERO_PAGE; use pageserver_api::key::rel_block_to_key; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -107,143 +109,6 @@ struct WarnIngestLag { timestamp_invalid_msg_ratelimit: RateLimit, } -// These structs are an intermediary representation of the PostgreSQL WAL records. -// The ones prefixed with `Xl` are lower level, while the ones that are not have -// all the required context to be acted upon by the pageserver. - -enum HeapamRecord { - ClearVmBits(ClearVmBits), -} - -struct ClearVmBits { - new_heap_blkno: Option, - old_heap_blkno: Option, - vm_rel: RelTag, - flags: u8, -} - -enum NeonrmgrRecord { - ClearVmBits(ClearVmBits), -} - -enum SmgrRecord { - Create(SmgrCreate), - Truncate(XlSmgrTruncate), -} - -struct SmgrCreate { - rel: RelTag, -} - -enum DbaseRecord { - Create(DbaseCreate), - Drop(DbaseDrop), -} - -struct DbaseCreate { - db_id: u32, - tablespace_id: u32, - src_db_id: u32, - src_tablespace_id: u32, -} - -struct DbaseDrop { - db_id: u32, - tablespace_ids: Vec, -} - -enum ClogRecord { - ZeroPage(ClogZeroPage), - Truncate(ClogTruncate), -} - -struct ClogZeroPage { - segno: u32, - rpageno: u32, -} - -struct ClogTruncate { - pageno: u32, - oldest_xid: u32, - oldest_xid_db: u32, -} - -enum XactRecord { - Commit(XactCommon), - Abort(XactCommon), - CommitPrepared(XactCommon), - AbortPrepared(XactCommon), - Prepare(XactPrepare), -} - -struct XactCommon { - parsed: XlXactParsedRecord, - origin_id: u16, - // Fields below are only used for logging - xl_xid: u32, - lsn: Lsn, -} - -struct XactPrepare { - xl_xid: u32, - data: Bytes, -} - -enum MultiXactRecord { - ZeroPage(MultiXactZeroPage), - Create(XlMultiXactCreate), - Truncate(XlMultiXactTruncate), -} - -struct MultiXactZeroPage { - slru_kind: SlruKind, - segno: u32, - rpageno: u32, -} - -enum RelmapRecord { - Update(RelmapUpdate), -} - -struct RelmapUpdate { - update: XlRelmapUpdate, - buf: Bytes, -} - -enum XlogRecord { - Raw(RawXlogRecord), -} - -struct RawXlogRecord { - info: u8, - lsn: Lsn, - buf: Bytes, -} - -enum LogicalMessageRecord { - Put(PutLogicalMessage), - #[cfg(feature = "testing")] - Failpoint, -} - -struct PutLogicalMessage { - path: String, - buf: Bytes, -} - -enum StandbyRecord { - RunningXacts(StandbyRunningXacts), -} - -struct StandbyRunningXacts { - oldest_running_xid: u32, -} - -enum ReploriginRecord { - Set(XlReploriginSet), - Drop(XlReploriginDrop), -} - impl WalIngest { pub async fn new( timeline: &Timeline, @@ -283,7 +148,6 @@ impl WalIngest { /// relations/pages that the record affects. /// /// This function returns `true` if the record was ingested, and `false` if it was filtered out - /// pub async fn ingest_record( &mut self, decoded: DecodedWALRecord, @@ -2210,7 +2074,7 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); + let xlrec = postgres_ffi::record::XlLogicalMessage::decode(buf); let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; #[cfg(feature = "testing")] @@ -2238,7 +2102,7 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = crate::walrecord::XlRunningXacts::decode(buf); + let xlrec = XlRunningXacts::decode(buf); return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { oldest_running_xid: xlrec.oldest_running_xid, }))); @@ -2268,10 +2132,10 @@ impl WalIngest { ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = crate::walrecord::XlReploriginSet::decode(buf); + let xlrec = XlReploriginSet::decode(buf); return Ok(Some(ReploriginRecord::Set(xlrec))); } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = crate::walrecord::XlReploriginDrop::decode(buf); + let xlrec = XlReploriginDrop::decode(buf); return Ok(Some(ReploriginRecord::Drop(xlrec))); } @@ -3092,6 +2956,7 @@ mod tests { #[tokio::test] async fn test_ingest_real_wal() { use crate::tenant::harness::*; + use postgres_ffi::record::decode_wal_record; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::WAL_SEGMENT_SIZE; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a1c9fc5651..027a6eb7d7 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -29,11 +29,11 @@ use crate::metrics::{ WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME, }; -use crate::repository::Key; -use crate::walrecord::NeonWalRecord; use anyhow::Context; use bytes::{Bytes, BytesMut}; +use pageserver_api::key::Key; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; +use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use std::future::Future; use std::sync::Arc; @@ -548,9 +548,10 @@ impl PostgresRedoManager { #[cfg(test)] mod tests { use super::PostgresRedoManager; - use crate::repository::Key; - use crate::{config::PageServerConf, walrecord::NeonWalRecord}; + use crate::config::PageServerConf; use bytes::Bytes; + use pageserver_api::key::Key; + use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use std::str::FromStr; use tracing::Instrument; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index c067787f97..7aaa357318 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -1,8 +1,8 @@ -use crate::walrecord::NeonWalRecord; use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use pageserver_api::key::Key; +use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; @@ -238,7 +238,7 @@ pub(crate) fn apply_in_neon( // No-op: this record will never be created in aux v2. warn!("AuxFile record should not be created in aux v2"); } - #[cfg(test)] + #[cfg(feature = "testing")] NeonWalRecord::Test { append, clear, diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index f3197e68b5..7e9477cfbc 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -8,10 +8,10 @@ use crate::{ metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, page_cache::PAGE_SZ, span::debug_assert_current_span_has_tenant_id, - walrecord::NeonWalRecord, }; use anyhow::Context; use bytes::Bytes; +use pageserver_api::record::NeonWalRecord; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; use postgres_ffi::BLCKSZ; #[cfg(feature = "testing")]