mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
6 Commits
split-prox
...
pg-checksu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0eca1d19de | ||
|
|
53b9cb915e | ||
|
|
cc6ffb558d | ||
|
|
b135dbb85d | ||
|
|
6059801943 | ||
|
|
2501afba6e |
@@ -427,6 +427,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
data_checksums_enabled: Some(true),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?
|
||||
@@ -436,7 +437,7 @@ impl PageServerNode {
|
||||
.map(|id| {
|
||||
id.parse().with_context(|| {
|
||||
format!(
|
||||
"Failed to parse tennat creation response as tenant id: {}",
|
||||
"Failed to parse tenant creation response as tenant id: {}",
|
||||
id
|
||||
)
|
||||
})
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
||||
|
||||
@@ -56,3 +57,55 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
|
||||
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
|
||||
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
|
||||
}
|
||||
|
||||
/// Calculate page checksum and stamp it onto the page.
|
||||
/// NB: this will zero out and ignore any existing checksum.
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
}
|
||||
|
||||
/// Check if page checksum is valid.
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pg_constants::BLCKSZ;
|
||||
use crate::{page_set_checksum, page_verify_checksum};
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
|
||||
#[test]
|
||||
fn set_and_verify_checksum() {
|
||||
// Create a page with some content and without a correct checksum.
|
||||
let mut page: [u8; BLCKSZ as usize] = [0; BLCKSZ as usize];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ as usize) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Sanity check: random bytes in the checksum attribute should not be
|
||||
// a valid checksum.
|
||||
assert_ne!(
|
||||
checksum,
|
||||
u16::from_le_bytes(page[8..10].try_into().unwrap())
|
||||
);
|
||||
|
||||
// Set the actual checksum.
|
||||
unsafe { page_set_checksum(&mut page, 0) };
|
||||
|
||||
// Verify the checksum.
|
||||
assert!(unsafe { page_verify_checksum(&page[..], 0) });
|
||||
|
||||
// Checksum is not valid with another block number.
|
||||
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ use super::XLogLongPageHeaderData;
|
||||
use super::XLogPageHeaderData;
|
||||
use super::XLogRecord;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use thiserror::Error;
|
||||
@@ -198,18 +197,12 @@ impl WalStreamDecoder {
|
||||
}
|
||||
|
||||
// We now have a record in the 'recordbuf' local variable.
|
||||
let xlogrec =
|
||||
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("xlog record deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
}
|
||||
})?;
|
||||
let xlogrec = XLogRecord::from_buf(&recordbuf).map_err(|e| WalDecodeError {
|
||||
msg: format!("xlog record deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
})?;
|
||||
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
if !wal_record_verify_checksum(&xlogrec, &recordbuf) {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
|
||||
@@ -477,6 +477,10 @@ impl XLogRecord {
|
||||
XLogRecord::des(buf)
|
||||
}
|
||||
|
||||
pub fn from_buf(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
|
||||
XLogRecord::from_slice(&buf[0..XLOG_SIZE_OF_XLOG_RECORD])
|
||||
}
|
||||
|
||||
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
|
||||
use utils::bin_ser::LeSer;
|
||||
XLogRecord::des_from(&mut buf.reader())
|
||||
@@ -742,3 +746,11 @@ mod tests {
|
||||
assert_eq!(checkpoint.nextXid.value, 2048);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wal_record_verify_checksum(rec: &XLogRecord, recordbuf: &Bytes) -> bool {
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
|
||||
crc == rec.xl_crc
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ impl Conf {
|
||||
.new_pg_command("initdb")?
|
||||
.arg("-D")
|
||||
.arg(self.datadir.as_os_str())
|
||||
.arg("--data-checksums")
|
||||
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
|
||||
.output()?;
|
||||
debug!("initdb output: {:?}", output);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
use utils::zid;
|
||||
|
||||
pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
@@ -18,5 +18,20 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_zid_stringify);
|
||||
// NB: adding `black_box` around arguments doesn't seem to change anything.
|
||||
pub fn pg_checksum_page_basic(c: &mut Criterion) {
|
||||
const BLCKSZ: usize = 8192;
|
||||
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
c.bench_function("pg_checksum_page_basic", |b| {
|
||||
b.iter(|| {
|
||||
unsafe { pg_checksum_page(&page[..], 0) };
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -5,7 +5,7 @@ DATA_DIR=$3
|
||||
PORT=$4
|
||||
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
|
||||
rm -fr $DATA_DIR
|
||||
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
|
||||
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --data-checksums --sysid=$SYSID
|
||||
echo port=$PORT >> $DATA_DIR/postgresql.conf
|
||||
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
|
||||
@@ -54,6 +54,9 @@ pub mod nonblock;
|
||||
// Default signal handling
|
||||
pub mod signals;
|
||||
|
||||
// Postgres checksum calculation
|
||||
pub mod pg_checksum_page;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
136
libs/utils/src/pg_checksum_page.rs
Normal file
136
libs/utils/src/pg_checksum_page.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
///
|
||||
/// Rust implementation of Postgres pg_checksum_page
|
||||
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
|
||||
/// for additional comments.
|
||||
///
|
||||
/// This is not a direct port of pg_checksum_page from Postgres, though.
|
||||
/// For example, in the current state it can only produce a valid result
|
||||
/// on the little-endian platform and with the standard 8 KB page size.
|
||||
///
|
||||
|
||||
const BLCKSZ: usize = 8192;
|
||||
const N_SUMS: usize = 32;
|
||||
// Prime multiplier of FNV-1a hash
|
||||
const FNV_PRIME: u32 = 16777619;
|
||||
|
||||
// Base offsets to initialize each of the parallel FNV hashes into a
|
||||
// different initial state.
|
||||
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
|
||||
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
|
||||
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
|
||||
0xE58F764B, 0x187636BC, 0x5D7B3BB1, 0xE73DE7DE, 0x92BEC979, 0xCCA6C0B2, 0x304A0979, 0x85AA43D4,
|
||||
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
|
||||
];
|
||||
|
||||
// Calculate one round of the checksum.
|
||||
fn checksum_comp(checksum: u32, value: u32) -> u32 {
|
||||
let tmp = checksum ^ value;
|
||||
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
|
||||
}
|
||||
|
||||
/// Compute the checksum for a Postgres page.
|
||||
///
|
||||
/// The page must be adequately aligned (at least on a 4-byte boundary).
|
||||
///
|
||||
/// The checksum includes the block number (to detect the case where a page is
|
||||
/// somehow moved to a different location), the page header (excluding the
|
||||
/// checksum itself), and the page data.
|
||||
///
|
||||
/// As in C implementation in Postgres, the checksum attribute on the page is
|
||||
/// excluded from the calculation and preserved.
|
||||
///
|
||||
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
|
||||
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
|
||||
/// then it's worth looking into.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `data` - the page to checksum
|
||||
/// * `blkno` - the block number of the page
|
||||
///
|
||||
/// # Safety
|
||||
/// This function is safe to call only if:
|
||||
/// * `data` is strictly a standard 8 KB Postgres page
|
||||
/// * it's called on the little-endian platform
|
||||
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
|
||||
let page = std::mem::transmute::<&[u8], &[u32]>(data);
|
||||
let mut checksum: u32 = 0;
|
||||
let mut sums = CHECKSUM_BASE_OFFSETS;
|
||||
|
||||
// Calculate the checksum of the first 'row' of the page. Do it separately as
|
||||
// we do an expensive comparison here, which is not required for the rest of the
|
||||
// page. Putting it into the main loop slows it down ~3 times.
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
// Third 32-bit chunk of the page contains the checksum in the lower half
|
||||
// (assuming we are on little-endian machine), which we need to zero out.
|
||||
// See also `PageHeaderData` for reference.
|
||||
let chunk: u32 = if j == 2 {
|
||||
page[j] & 0xFFFF_0000
|
||||
} else {
|
||||
page[j]
|
||||
};
|
||||
|
||||
*sum = checksum_comp(*sum, chunk);
|
||||
}
|
||||
|
||||
// Main checksum calculation loop
|
||||
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, add in two rounds of zeroes for additional mixing
|
||||
for _i in 0..2 {
|
||||
for s in sums.iter_mut().take(N_SUMS) {
|
||||
*s = checksum_comp(*s, 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Xor fold partial checksums together
|
||||
for sum in sums {
|
||||
checksum ^= sum;
|
||||
}
|
||||
|
||||
// Mix in the block number to detect transposed pages
|
||||
checksum ^= blkno;
|
||||
|
||||
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
|
||||
// one. That avoids checksums of zero, which seems like a good idea.
|
||||
((checksum % 65535) + 1) as u16
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{pg_checksum_page, BLCKSZ};
|
||||
|
||||
#[test]
|
||||
fn page_with_and_without_checksum() {
|
||||
// Create a page with some content and without a correct checksum.
|
||||
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Zero the checksum attribute on the page.
|
||||
page[8..10].copy_from_slice(&[0u8; 2]);
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Set the correct checksum into the page.
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Check that we protect from the page transposition, i.e. page is the
|
||||
// same, but in the wrong place.
|
||||
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
|
||||
assert_ne!(checksum, wrong_blockno_checksum);
|
||||
}
|
||||
}
|
||||
@@ -38,6 +38,7 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -494,6 +494,8 @@ components:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
type: string
|
||||
data_checksums_enabled:
|
||||
type: boolean
|
||||
TenantConfigInfo:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -412,6 +412,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
// Turn on data checksums for all new tenants
|
||||
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
|
||||
|
||||
if let Some(compaction_period) = request_data.compaction_period {
|
||||
tenant_conf.compaction_period =
|
||||
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);
|
||||
|
||||
@@ -473,6 +473,7 @@ pub mod repo_harness {
|
||||
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,10 @@ pub mod defaults {
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
|
||||
// Turn off data checksums by default to do not affect old tenants.
|
||||
// We turn it on explicitly for all new tenants.
|
||||
pub const DEFAULT_DATA_CHECKSUMS: bool = false;
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
@@ -83,6 +87,7 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub data_checksums_enabled: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -105,6 +110,7 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -135,6 +141,9 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
data_checksums_enabled: self
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(global_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,6 +181,9 @@ impl TenantConfOpt {
|
||||
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
|
||||
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
|
||||
self.data_checksums_enabled = Some(data_checksums_enabled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,6 +211,7 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,6 +242,7 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::tenant_config::TenantConfOpt;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::timelines::CreateRepo;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::{thread_mgr, timelines, walreceiver};
|
||||
use crate::{tenant_config, thread_mgr, timelines, walreceiver};
|
||||
use crate::{DatadirTimelineImpl, RepositoryImpl};
|
||||
use anyhow::{bail, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -266,7 +266,14 @@ pub fn create_tenant_repository(
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
|
||||
conf,
|
||||
data_checksums_enabled,
|
||||
tenant_id,
|
||||
));
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
@@ -567,10 +574,16 @@ fn load_local_repo(
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> anyhow::Result<Arc<RepositoryImpl>> {
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
@@ -588,8 +601,6 @@ fn load_local_repo(
|
||||
}
|
||||
});
|
||||
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
tenant.repo.update_tenant_config(tenant_conf)?;
|
||||
|
||||
Ok(Arc::clone(&tenant.repo))
|
||||
|
||||
@@ -253,6 +253,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
|
||||
.args(&["-D", &initdbpath.to_string_lossy()])
|
||||
.args(&["-U", &conf.superuser])
|
||||
.args(&["-E", "utf8"])
|
||||
.arg("--data-checksums")
|
||||
.arg("--no-instructions")
|
||||
// This is only used for a temporary installation that is deleted shortly after,
|
||||
// so no need to fsync it
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
use anyhow::Context;
|
||||
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn};
|
||||
use postgres_ffi::{page_is_new, page_set_checksum, page_set_lsn};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -313,6 +313,8 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
unsafe { page_set_checksum(&mut image, blk.blkno) };
|
||||
|
||||
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
|
||||
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
|
||||
} else {
|
||||
|
||||
@@ -48,7 +48,8 @@ use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
|
||||
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
|
||||
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
|
||||
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::wal_record_verify_checksum;
|
||||
use postgres_ffi::{page_verify_checksum, pg_constants, XLogRecord};
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
@@ -131,6 +132,7 @@ lazy_static! {
|
||||
pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums_enabled: bool,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
@@ -229,11 +231,16 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums_enabled: bool,
|
||||
tenantid: ZTenantId,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
tenantid,
|
||||
conf,
|
||||
data_checksums_enabled,
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -268,7 +275,13 @@ impl PostgresRedoManager {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = process
|
||||
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
|
||||
.apply_wal_records(
|
||||
buf_tag,
|
||||
base_img,
|
||||
records,
|
||||
wal_redo_timeout,
|
||||
self.data_checksums_enabled,
|
||||
)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
let end_time = Instant::now();
|
||||
@@ -619,6 +632,7 @@ impl PostgresRedoProcess {
|
||||
info!("running initdb in {:?}", datadir.display());
|
||||
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
|
||||
.args(&["-D", &datadir.to_string_lossy()])
|
||||
.arg("--data-checksums")
|
||||
.arg("-N")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
|
||||
@@ -716,6 +730,7 @@ impl PostgresRedoProcess {
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, ZenithWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
data_checksums_enabled: bool,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
@@ -725,6 +740,15 @@ impl PostgresRedoProcess {
|
||||
let mut writebuf: Vec<u8> = Vec::new();
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
// Checksums could be not stamped for old tenants, so check them only if they
|
||||
// are enabled (this is controlled by per-tenant config).
|
||||
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
|
||||
));
|
||||
}
|
||||
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
}
|
||||
for (lsn, rec) in records.iter() {
|
||||
@@ -733,6 +757,27 @@ impl PostgresRedoProcess {
|
||||
rec: postgres_rec,
|
||||
} = rec
|
||||
{
|
||||
let xlogrec = XLogRecord::from_buf(postgres_rec).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"could not deserialize WAL record for relation {} at LSN {}: {}",
|
||||
tag.rel, lsn, e
|
||||
),
|
||||
)
|
||||
})?;
|
||||
// WAL records always have a checksum, check it before sending to redo process.
|
||||
// It doesn't do these checks itself.
|
||||
if !wal_record_verify_checksum(&xlogrec, postgres_rec) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"WAL record for relation {} at LSN {} is invalid",
|
||||
tag.rel, lsn
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
|
||||
@@ -37,7 +37,7 @@ You can run all the tests with:
|
||||
|
||||
If you want to run all the tests in a particular file:
|
||||
|
||||
`./scripts/pytest test_pgbench.py`
|
||||
`./scripts/pytest test_runner/batch_others/test_restart_compute.py`
|
||||
|
||||
If you want to run all tests that have the string "bench" in their names:
|
||||
|
||||
|
||||
@@ -682,7 +682,7 @@ class ProposerPostgres(PgProtocol):
|
||||
def initdb(self):
|
||||
""" Run initdb """
|
||||
|
||||
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()]
|
||||
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path(), "--data-checksums"]
|
||||
self.pg_bin.run(args)
|
||||
|
||||
def start(self):
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 35ad142301...b62a5c1aec
Reference in New Issue
Block a user