mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-08 21:20:38 +00:00
Compare commits
5 Commits
add_audit_
...
erik/crite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df58da66d2 | ||
|
|
f4b8795cf8 | ||
|
|
45fe1236ab | ||
|
|
6eaf9b9a48 | ||
|
|
270a9dd91f |
152
Cargo.lock
generated
152
Cargo.lock
generated
@@ -146,6 +146,12 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.6.2"
|
||||
@@ -769,7 +775,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"quick-xml",
|
||||
"quick-xml 0.31.0",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.11.19",
|
||||
"rustc_version",
|
||||
@@ -951,7 +957,7 @@ dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.10.5",
|
||||
"itertools 0.12.1",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"log",
|
||||
@@ -974,7 +980,7 @@ dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.10.5",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
@@ -1437,6 +1443,15 @@ version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.9"
|
||||
@@ -2073,6 +2088,18 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "findshlibs"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
@@ -2791,6 +2818,24 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap 2.0.1",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.26.0",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -3136,6 +3181,15 @@ version = "2.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.7.1"
|
||||
@@ -3367,6 +3421,16 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-format"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
@@ -4197,6 +4261,28 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef5c97c51bd34c7e742402e216abdeb44d415fbe6ae41d56b114723e953711cb"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"criterion",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"smallvec",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
@@ -4314,7 +4400,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.10.5",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@@ -4334,7 +4420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.10.5",
|
||||
"itertools 0.12.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
@@ -4457,6 +4543,15 @@ dependencies = [
|
||||
"x509-parser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.31.0"
|
||||
@@ -4938,6 +5033,15 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.6"
|
||||
@@ -5237,6 +5341,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"desim",
|
||||
"fail",
|
||||
"futures",
|
||||
@@ -5244,6 +5349,7 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -5251,6 +5357,7 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -5797,6 +5904,12 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -5942,6 +6055,12 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
@@ -5989,6 +6108,29 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "366f1b4c6baf6cfefc234bbd4899535fca0b06c74443039a73f6dfb2fad88d77"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"memmap2",
|
||||
"stable_deref_trait",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-demangle"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aba05ba5b9962ea5617baf556293720a8b2d0a282aa14ee4bf10e22efc7da8c8"
|
||||
dependencies = [
|
||||
"cpp_demangle",
|
||||
"rustc-demangle",
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
|
||||
@@ -130,6 +130,7 @@ parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }
|
||||
procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13"
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::ffi::CStr;
|
||||
use std::ffi::{CStr, CString};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use crc32c::crc32c_append;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
|
||||
use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
|
||||
use super::xlog_utils::{
|
||||
XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
|
||||
XLP_FIRST_IS_CONTRECORD,
|
||||
@@ -16,11 +16,65 @@ use crate::pg_constants::{
|
||||
};
|
||||
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
|
||||
/// Generates binary WAL records for use in tests and benchmarks. Currently only generates logical
|
||||
/// messages (effectively noops) with a fixed payload. It is used as an iterator which yields
|
||||
/// encoded bytes for a single WAL record, including internal page headers if it spans pages.
|
||||
/// Concatenating the bytes will yield a complete, well-formed WAL, which can be chunked at segment
|
||||
/// boundaries if desired. Not optimized for performance.
|
||||
/// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
|
||||
pub struct Record {
|
||||
pub rmid: RmgrId,
|
||||
pub info: u8,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
impl Record {
|
||||
/// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
|
||||
/// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
|
||||
pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
|
||||
// Prefix data with block ID and length.
|
||||
let data_header = Bytes::from(match self.data.len() {
|
||||
0 => vec![],
|
||||
1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
|
||||
256.. => {
|
||||
let len_bytes = (self.data.len() as u32).to_le_bytes();
|
||||
[&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
|
||||
}
|
||||
});
|
||||
|
||||
// Construct the WAL record header.
|
||||
let mut header = XLogRecord {
|
||||
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
|
||||
xl_xid: 0,
|
||||
xl_prev: prev_lsn.into(),
|
||||
xl_info: self.info,
|
||||
xl_rmid: self.rmid,
|
||||
__bindgen_padding_0: [0; 2],
|
||||
xl_crc: 0, // see below
|
||||
};
|
||||
|
||||
// Compute the CRC checksum for the data, and the header up to the CRC field.
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &data_header);
|
||||
crc = crc32c_append(crc, &self.data);
|
||||
crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
|
||||
header.xl_crc = crc;
|
||||
|
||||
// Encode the final header and record.
|
||||
let header = header.encode().unwrap();
|
||||
|
||||
[header, data_header, self.data.clone()].concat().into()
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates WAL record payloads.
|
||||
///
|
||||
/// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
|
||||
/// that creates a table and inserts rows.
|
||||
pub trait RecordGenerator: Iterator<Item = Record> {}
|
||||
|
||||
impl<I: Iterator<Item = Record>> RecordGenerator for I {}
|
||||
|
||||
/// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
|
||||
/// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
|
||||
/// including internal page headers if it spans pages. Concatenating the bytes will yield a
|
||||
/// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
|
||||
/// for performance.
|
||||
///
|
||||
/// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
|
||||
/// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
|
||||
@@ -31,10 +85,10 @@ use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
/// | Segment 1 | Segment 2 | Segment 3 |
|
||||
/// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
|
||||
/// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 |
|
||||
///
|
||||
/// TODO: support generating actual tables and rows.
|
||||
#[derive(Default)]
|
||||
pub struct WalGenerator {
|
||||
pub struct WalGenerator<R: RecordGenerator> {
|
||||
/// Generates record payloads for the WAL.
|
||||
pub record_generator: R,
|
||||
/// Current LSN to append the next record at.
|
||||
///
|
||||
/// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
|
||||
@@ -46,73 +100,35 @@ pub struct WalGenerator {
|
||||
pub prev_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalGenerator {
|
||||
// For now, hardcode the message payload.
|
||||
// TODO: support specifying the payload size.
|
||||
const PREFIX: &CStr = c"prefix";
|
||||
const MESSAGE: &[u8] = b"message";
|
||||
|
||||
// Hardcode the sys, timeline, and DB IDs. We can make them configurable if we care about them.
|
||||
impl<R: RecordGenerator> WalGenerator<R> {
|
||||
// Hardcode the sys and timeline ID. We can make them configurable if we care about them.
|
||||
const SYS_ID: u64 = 0;
|
||||
const TIMELINE_ID: u32 = 1;
|
||||
const DB_ID: u32 = 0;
|
||||
|
||||
/// Creates a new WAL generator, which emits logical message records (noops).
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
/// Creates a new WAL generator with the given record generator.
|
||||
pub fn new(record_generator: R) -> WalGenerator<R> {
|
||||
Self {
|
||||
record_generator,
|
||||
lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a logical message (basically a noop), with the given prefix and message.
|
||||
pub(crate) fn encode_logical_message(prefix: &CStr, message: &[u8]) -> Bytes {
|
||||
let prefix = prefix.to_bytes_with_nul();
|
||||
let header = XlLogicalMessage {
|
||||
db_id: Self::DB_ID,
|
||||
transactional: 0,
|
||||
prefix_size: prefix.len() as u64,
|
||||
message_size: message.len() as u64,
|
||||
};
|
||||
[&header.encode(), prefix, message].concat().into()
|
||||
/// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
|
||||
/// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
|
||||
fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
|
||||
let record = record.encode(self.prev_lsn);
|
||||
let record = Self::insert_pages(record, self.lsn);
|
||||
let record = Self::pad_record(record, self.lsn);
|
||||
let lsn = self.lsn;
|
||||
self.prev_lsn = self.lsn;
|
||||
self.lsn += record.len() as u64;
|
||||
(lsn, record)
|
||||
}
|
||||
|
||||
/// Encode a WAL record with the given payload data (e.g. a logical message).
|
||||
pub(crate) fn encode_record(data: Bytes, rmid: u8, info: u8, prev_lsn: Lsn) -> Bytes {
|
||||
// Prefix data with block ID and length.
|
||||
let data_header = Bytes::from(match data.len() {
|
||||
0 => vec![],
|
||||
1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, data.len() as u8],
|
||||
256.. => {
|
||||
let len_bytes = (data.len() as u32).to_le_bytes();
|
||||
[&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
|
||||
}
|
||||
});
|
||||
|
||||
// Construct the WAL record header.
|
||||
let mut header = XLogRecord {
|
||||
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + data.len()) as u32,
|
||||
xl_xid: 0,
|
||||
xl_prev: prev_lsn.into(),
|
||||
xl_info: info,
|
||||
xl_rmid: rmid,
|
||||
__bindgen_padding_0: [0; 2],
|
||||
xl_crc: 0, // see below
|
||||
};
|
||||
|
||||
// Compute the CRC checksum for the data, and the header up to the CRC field.
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &data_header);
|
||||
crc = crc32c_append(crc, &data);
|
||||
crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
|
||||
header.xl_crc = crc;
|
||||
|
||||
// Encode the final header and record.
|
||||
let header = header.encode().unwrap();
|
||||
|
||||
[header, data_header, data].concat().into()
|
||||
}
|
||||
|
||||
/// Injects page headers on 8KB page boundaries. Takes the current LSN position where the record
|
||||
/// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
|
||||
/// is to be appended.
|
||||
fn encode_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
|
||||
fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
|
||||
// Fast path: record fits in current page, and the page already has a header.
|
||||
if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
|
||||
return record;
|
||||
@@ -173,31 +189,71 @@ impl WalGenerator {
|
||||
}
|
||||
[record, Bytes::from(vec![0; padding])].concat().into()
|
||||
}
|
||||
|
||||
/// Generates a record with an arbitrary payload at the current LSN, then increments the LSN.
|
||||
pub fn generate_record(&mut self, data: Bytes, rmid: u8, info: u8) -> Bytes {
|
||||
let record = Self::encode_record(data, rmid, info, self.prev_lsn);
|
||||
let record = Self::encode_pages(record, self.lsn);
|
||||
let record = Self::pad_record(record, self.lsn);
|
||||
self.prev_lsn = self.lsn;
|
||||
self.lsn += record.len() as u64;
|
||||
record
|
||||
}
|
||||
|
||||
/// Generates a logical message at the current LSN. Can be used to construct arbitrary messages.
|
||||
pub fn generate_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> Bytes {
|
||||
let data = Self::encode_logical_message(prefix, message);
|
||||
self.generate_record(data, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE)
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate WAL records as an iterator.
|
||||
impl Iterator for WalGenerator {
|
||||
/// Generates WAL records as an iterator.
|
||||
impl<R: RecordGenerator> Iterator for WalGenerator<R> {
|
||||
type Item = (Lsn, Bytes);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let lsn = self.lsn;
|
||||
let record = self.generate_logical_message(Self::PREFIX, Self::MESSAGE);
|
||||
Some((lsn, record))
|
||||
let record = self.record_generator.next()?;
|
||||
Some(self.append_record(record))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates logical message records (effectively noops) with a fixed message.
|
||||
pub struct LogicalMessageGenerator {
|
||||
prefix: CString,
|
||||
message: Vec<u8>,
|
||||
}
|
||||
|
||||
impl LogicalMessageGenerator {
|
||||
const DB_ID: u32 = 0; // hardcoded for now
|
||||
const RM_ID: RmgrId = RM_LOGICALMSG_ID;
|
||||
const INFO: u8 = XLOG_LOGICAL_MESSAGE;
|
||||
|
||||
/// Creates a new LogicalMessageGenerator.
|
||||
pub fn new(prefix: &CStr, message: &[u8]) -> Self {
|
||||
Self {
|
||||
prefix: prefix.to_owned(),
|
||||
message: message.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a logical message.
|
||||
fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
|
||||
let prefix = prefix.to_bytes_with_nul();
|
||||
let header = XlLogicalMessage {
|
||||
db_id: Self::DB_ID,
|
||||
transactional: 0,
|
||||
prefix_size: prefix.len() as u64,
|
||||
message_size: message.len() as u64,
|
||||
};
|
||||
[&header.encode(), prefix, message].concat().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for LogicalMessageGenerator {
|
||||
type Item = Record;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
Some(Record {
|
||||
rmid: Self::RM_ID,
|
||||
info: Self::INFO,
|
||||
data: Self::encode(&self.prefix, &self.message),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl WalGenerator<LogicalMessageGenerator> {
|
||||
/// Convenience method for appending a WAL record with an arbitrary logical message at the
|
||||
/// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
|
||||
pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
|
||||
let record = Record {
|
||||
rmid: LogicalMessageGenerator::RM_ID,
|
||||
info: LogicalMessageGenerator::INFO,
|
||||
data: LogicalMessageGenerator::encode(prefix, message),
|
||||
};
|
||||
self.append_record(record)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,9 +12,9 @@ use super::bindings::{
|
||||
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
|
||||
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
|
||||
};
|
||||
use super::wal_generator::WalGenerator;
|
||||
use super::wal_generator::LogicalMessageGenerator;
|
||||
use super::PG_MAJORVERSION;
|
||||
use crate::pg_constants::{self, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE};
|
||||
use crate::pg_constants;
|
||||
use crate::PG_TLI;
|
||||
use crate::{uint32, uint64, Oid};
|
||||
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
@@ -493,12 +493,10 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes {
|
||||
// This function can take untrusted input, so discard any NUL bytes in the prefix string.
|
||||
let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs");
|
||||
let message = message.as_bytes();
|
||||
WalGenerator::encode_record(
|
||||
WalGenerator::encode_logical_message(&prefix, message),
|
||||
RM_LOGICALMSG_ID,
|
||||
XLOG_LOGICAL_MESSAGE,
|
||||
Lsn(0),
|
||||
)
|
||||
LogicalMessageGenerator::new(&prefix, message)
|
||||
.next()
|
||||
.unwrap()
|
||||
.encode(Lsn(0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -61,8 +61,15 @@ utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
itertools.workspace = true
|
||||
pprof.workspace = true
|
||||
walproposer.workspace = true
|
||||
rand.workspace = true
|
||||
desim.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
|
||||
[[bench]]
|
||||
name = "receive_wal"
|
||||
harness = false
|
||||
|
||||
26
safekeeper/benches/README.md
Normal file
26
safekeeper/benches/README.md
Normal file
@@ -0,0 +1,26 @@
|
||||
## Safekeeper Benchmarks
|
||||
|
||||
To run benchmarks:
|
||||
|
||||
```sh
|
||||
# All benchmarks.
|
||||
cargo bench --package safekeeper
|
||||
|
||||
# Specific file.
|
||||
cargo bench --package safekeeper --bench receive_wal
|
||||
|
||||
# Specific benchmark.
|
||||
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false --profile-time 10
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package safekeeper --benches -- --list
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
Benchmarks are automatically compared against the previous run. To compare against other runs, see
|
||||
`--baseline` and `--save-baseline`.
|
||||
115
safekeeper/benches/benchutils.rs
Normal file
115
safekeeper/benches/benchutils.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use criterion::Criterion;
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use safekeeper::rate_limit::RateLimiter;
|
||||
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use safekeeper::state::{TimelinePersistentState, TimelineState};
|
||||
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use safekeeper::timelines_set::TimelinesSet;
|
||||
use safekeeper::wal_backup::remote_timeline_path;
|
||||
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
|
||||
use tokio::fs::create_dir_all;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Configures Criterion.
|
||||
pub fn setup_criterion() -> Criterion {
|
||||
let mut c = Criterion::default();
|
||||
|
||||
// Use pprof-rs for profiles, with flamegraph SVG output in
|
||||
// target/criterion/*/profile/flamegraph.svg
|
||||
c = c.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
|
||||
c
|
||||
}
|
||||
|
||||
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
pub struct Env {
|
||||
/// Whether to enable fsync.
|
||||
pub fsync: bool,
|
||||
/// Benchmark directory. Deleted when dropped.
|
||||
pub tempdir: Utf8TempDir,
|
||||
}
|
||||
|
||||
impl Env {
|
||||
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// enable fsyncing.
|
||||
pub fn new(fsync: bool) -> anyhow::Result<Self> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
Ok(Self { fsync, tempdir })
|
||||
}
|
||||
|
||||
/// Constructs a Safekeeper config for the given node ID.
|
||||
fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
|
||||
let mut conf = SafeKeeperConf::dummy();
|
||||
conf.my_id = node_id;
|
||||
conf.no_sync = !self.fsync;
|
||||
conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
|
||||
conf
|
||||
}
|
||||
|
||||
/// Constructs a Safekeeper with the given node and tenant/timeline ID.
|
||||
///
|
||||
/// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
|
||||
/// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
|
||||
/// currently possible to construct a timeline using non-file storage since StateSK only accepts
|
||||
/// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
|
||||
pub async fn make_safekeeper(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
|
||||
let conf = self.make_conf(node_id);
|
||||
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
create_dir_all(&timeline_dir).await?;
|
||||
|
||||
let mut pstate = TimelinePersistentState::empty();
|
||||
pstate.tenant_id = ttid.tenant_id;
|
||||
pstate.timeline_id = ttid.timeline_id;
|
||||
|
||||
let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
|
||||
let ctrl =
|
||||
control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
|
||||
let state = TimelineState::new(ctrl);
|
||||
let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
|
||||
|
||||
// Emulate an initial election.
|
||||
safekeeper
|
||||
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(0),
|
||||
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
Ok(safekeeper)
|
||||
}
|
||||
|
||||
/// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
|
||||
/// manager task.
|
||||
pub async fn make_timeline(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let conf = self.make_conf(node_id);
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||
|
||||
let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state);
|
||||
timeline.bootstrap(
|
||||
&mut timeline.write_shared_state().await,
|
||||
&conf,
|
||||
Arc::new(TimelinesSet::default()), // ignored for now
|
||||
RateLimiter::new(0, 0),
|
||||
);
|
||||
Ok(timeline)
|
||||
}
|
||||
}
|
||||
249
safekeeper/benches/receive_wal.rs
Normal file
249
safekeeper/benches/receive_wal.rs
Normal file
@@ -0,0 +1,249 @@
|
||||
//! WAL ingestion benchmarks.
|
||||
|
||||
#[path = "benchutils.rs"]
|
||||
mod benchutils;
|
||||
|
||||
use benchutils::{setup_criterion, Env};
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
use itertools::Itertools as _;
|
||||
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
};
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
const KB: usize = 1024;
|
||||
const MB: usize = 1024 * KB;
|
||||
const GB: usize = 1024 * MB;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = setup_criterion();
|
||||
targets = bench_process_msg, bench_wal_acceptor, bench_wal_acceptor_throughput
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
/// Benchmarks SafeKeeper::process_msg() as time per message. Each message is an AppendRequest with
|
||||
/// a single WAL record containing a tiny XlLogicalMessage (~64 bytes total).
|
||||
fn bench_process_msg(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("process_msg");
|
||||
for fsync in [false, true] {
|
||||
g.bench_function(format!("fsync={fsync}"), |b| run_bench(b, fsync).unwrap());
|
||||
}
|
||||
|
||||
// The actual benchmark.
|
||||
fn run_bench(b: &mut Bencher, fsync: bool) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread() // single is fine, sync IO only
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let env = Env::new(fsync)?;
|
||||
let mut safekeeper =
|
||||
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
|
||||
let mut walgen = WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
|
||||
|
||||
b.iter_batched_ref(
|
||||
// Pre-construct WAL records and requests. Criterion will batch them.
|
||||
|| {
|
||||
let (lsn, record) = walgen.next().expect("endless WAL");
|
||||
ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
})
|
||||
},
|
||||
// Benchmark message processing (time per message).
|
||||
|msg| {
|
||||
runtime
|
||||
.block_on(safekeeper.process_msg(msg))
|
||||
.expect("message failed")
|
||||
},
|
||||
BatchSize::SmallInput, // automatically determine a batch size
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks WalAcceptor message processing time by sending it a batch of WAL records and waiting
|
||||
/// for it to confirm that the last LSN has been flushed to storage. We pipeline a bunch of messages
|
||||
/// instead of measuring each individual message to amortize costs (e.g. fsync), which is more
|
||||
/// realistic. Records are XlLogicalMessage with a tiny payload (~64 bytes per record including
|
||||
/// headers). Records are pre-constructed to avoid skewing the benchmark.
|
||||
///
|
||||
/// TODO: add benchmarks with in-memory storage, see comment on `Env::make_safekeeper()`:
|
||||
fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("wal_acceptor");
|
||||
for fsync in [false, true] {
|
||||
for n in [1, 100, 10000] {
|
||||
g.bench_function(format!("fsync={fsync}/n={n}"), |b| {
|
||||
run_bench(b, n, fsync).unwrap()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual benchmark. n is the number of WAL records to send in a pipelined batch.
|
||||
fn run_bench(b: &mut Bencher, n: usize, fsync: bool) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
let env = Env::new(fsync)?;
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
|
||||
|
||||
// Create buffered channels that can fit all requests, to avoid blocking on channels.
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
|
||||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(n);
|
||||
|
||||
// Spawn the WalAcceptor task.
|
||||
runtime.block_on(async {
|
||||
// TODO: WalAcceptor doesn't actually need a full timeline, only
|
||||
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0));
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
b.iter_batched(
|
||||
// Pre-construct a batch of WAL records and requests.
|
||||
|| {
|
||||
walgen
|
||||
.take(n)
|
||||
.map(|(lsn, record)| AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
})
|
||||
.collect_vec()
|
||||
},
|
||||
// Benchmark batch ingestion (time per batch).
|
||||
|reqs| {
|
||||
runtime.block_on(async {
|
||||
let final_lsn = reqs.last().unwrap().h.end_lsn;
|
||||
// Stuff all the messages into the buffered channel to pipeline them.
|
||||
for req in reqs {
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
// Wait for the last message to get flushed.
|
||||
while let Some(reply) = reply_rx.recv().await {
|
||||
if let AcceptorProposerMessage::AppendResponse(resp) = reply {
|
||||
if resp.flush_lsn >= final_lsn {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("disconnected")
|
||||
})
|
||||
},
|
||||
BatchSize::PerIteration, // only run one request batch at a time
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks WalAcceptor throughput by sending 1 GB of data with varying message sizes and waiting
|
||||
/// for the last LSN to be flushed to storage. Only the actual message payload counts towards
|
||||
/// throughput, headers are excluded and considered overhead. Records are XlLogicalMessage.
|
||||
///
|
||||
/// To avoid running out of memory, messages are constructed during the benchmark.
|
||||
fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
const VOLUME: usize = GB; // NB: excludes message/page/segment headers and padding
|
||||
|
||||
let mut g = c.benchmark_group("wal_acceptor_throughput");
|
||||
g.sample_size(10);
|
||||
g.throughput(criterion::Throughput::Bytes(VOLUME as u64));
|
||||
|
||||
for fsync in [false, true] {
|
||||
for size in [KB, 4 * KB, 128 * KB, MB] {
|
||||
assert_eq!(VOLUME % size, 0, "volume must be divisible by size");
|
||||
let count = VOLUME / size;
|
||||
g.bench_function(format!("fsync={fsync}/size={size}"), |b| {
|
||||
run_bench(b, count, size, fsync).unwrap()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual benchmark. size is the payload size per message, count is the number of messages.
|
||||
fn run_bench(b: &mut Bencher, count: usize, size: usize, fsync: bool) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
// Construct the payload. The prefix counts towards the payload (including NUL terminator).
|
||||
let prefix = c"p";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
|
||||
// Construct and spawn the WalAcceptor task.
|
||||
let env = Env::new(fsync)?;
|
||||
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
|
||||
|
||||
runtime.block_on(async {
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0));
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
// Ingest the WAL.
|
||||
b.iter(|| {
|
||||
runtime.block_on(async {
|
||||
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
});
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
// Wait for last message to get flushed.
|
||||
while let Some(reply) = reply_rx.recv().await {
|
||||
if let AcceptorProposerMessage::AppendResponse(resp) = reply {
|
||||
if resp.flush_lsn >= walgen.lsn {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("disconnected")
|
||||
})
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -112,9 +112,7 @@ impl SafeKeeperConf {
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
#[cfg(test)]
|
||||
#[allow(unused)]
|
||||
fn dummy() -> Self {
|
||||
pub fn dummy() -> Self {
|
||||
SafeKeeperConf {
|
||||
workdir: Utf8PathBuf::from("./"),
|
||||
no_sync: false,
|
||||
|
||||
@@ -138,7 +138,6 @@ impl TimelinePersistentState {
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn empty() -> Self {
|
||||
TimelinePersistentState::new(
|
||||
&TenantTimelineId::empty(),
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! to glue together SafeKeeper and all other background services.
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use remote_storage::RemotePath;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -325,8 +325,17 @@ pub struct SharedState {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Creates a new SharedState.
|
||||
pub fn new(sk: StateSK) -> Self {
|
||||
Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
|
||||
pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
@@ -352,11 +361,7 @@ impl SharedState {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
Ok(Self::new(sk))
|
||||
}
|
||||
|
||||
pub(crate) fn get_wal_seg_size(&self) -> usize {
|
||||
@@ -480,11 +485,13 @@ pub struct Timeline {
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
/// Constructs a new timeline.
|
||||
pub fn new(
|
||||
ttid: TenantTimelineId,
|
||||
timeline_dir: &Utf8Path,
|
||||
remote_path: &RemotePath,
|
||||
shared_state: SharedState,
|
||||
) -> Arc<Self> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state().commit_lsn);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
|
||||
@@ -494,10 +501,11 @@ impl Timeline {
|
||||
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
Ok(Arc::new(Timeline {
|
||||
|
||||
Arc::new(Self {
|
||||
ttid,
|
||||
remote_path,
|
||||
remote_path: remote_path.to_owned(),
|
||||
timeline_dir: timeline_dir.to_owned(),
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
@@ -508,13 +516,28 @@ impl Timeline {
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancel: CancellationToken::default(),
|
||||
timeline_dir: get_timeline_dir(conf, &ttid),
|
||||
manager_ctl: ManagerCtl::new(),
|
||||
broker_active: AtomicBool::new(false),
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
last_removed_segno: AtomicU64::new(0),
|
||||
mgr_status: AtomicStatus::new(),
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
let timeline_dir = get_timeline_dir(conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
Ok(Timeline::new(
|
||||
ttid,
|
||||
&timeline_dir,
|
||||
&remote_path,
|
||||
shared_state,
|
||||
))
|
||||
}
|
||||
|
||||
/// Initialize fresh timeline on disk and start background tasks. If init
|
||||
@@ -1128,13 +1151,13 @@ async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
|
||||
|
||||
/// Get a path to the tenant directory. If you just need to get a timeline directory,
|
||||
/// use WalResidentTimeline::get_timeline_dir instead.
|
||||
pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
|
||||
pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
|
||||
conf.workdir.join(tenant_id.to_string())
|
||||
}
|
||||
|
||||
/// Get a path to the timeline directory. If you need to read WAL files from disk,
|
||||
/// use WalResidentTimeline::get_timeline_dir instead. This function does not check
|
||||
/// timeline eviction status and WAL files might not be present on disk.
|
||||
pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
|
||||
pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
|
||||
get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{ffi::CStr, sync::Arc};
|
||||
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use postgres_ffi::v16::wal_generator::WalGenerator;
|
||||
use postgres_ffi::v16::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::block_storage::BlockStorage;
|
||||
@@ -18,7 +18,7 @@ impl DiskWalProposer {
|
||||
internal_available_lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
disk: BlockStorage::new(),
|
||||
wal_generator: WalGenerator::new(),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -36,7 +36,7 @@ pub struct State {
|
||||
// actual WAL storage
|
||||
disk: BlockStorage,
|
||||
// WAL record generator
|
||||
wal_generator: WalGenerator,
|
||||
wal_generator: WalGenerator<LogicalMessageGenerator>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -64,7 +64,7 @@ impl State {
|
||||
|
||||
/// Inserts a logical record in the WAL at the current LSN.
|
||||
pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) {
|
||||
let record = self.wal_generator.generate_logical_message(prefix, msg);
|
||||
let (_, record) = self.wal_generator.append_logical_message(prefix, msg);
|
||||
self.disk.write(self.internal_available_lsn.into(), &record);
|
||||
self.prev_lsn = self.internal_available_lsn;
|
||||
self.internal_available_lsn += record.len() as u64;
|
||||
|
||||
Reference in New Issue
Block a user