Compare commits

...

6 Commits

Author SHA1 Message Date
Alexey Kondratov
0eca1d19de Add safety notes, benchmark. Optimize checksum calculation 2022-07-07 20:45:50 +02:00
Alexey Kondratov
53b9cb915e Turn off data-checksums for old tenants by default and explicitly enable for new ones 2022-07-07 19:44:47 +02:00
Alexey Kondratov
cc6ffb558d Verify checksum of the page and WAL records before sending to the redo process 2022-07-07 19:44:47 +02:00
Alexey Kondratov
b135dbb85d Bump vendor/postgres 2022-07-07 19:44:47 +02:00
Alexey Kondratov
6059801943 Enable Postgres data checksums (neondatabase/cloud#536)
We need checksums to verify data integrity, when we read it from
untrusted place (e.g. local disk) or via untrusted communication channel
(e.g. network). At the same time, we trust pageserver <-> redo process
communication channel, as it is just a pipe.

Here we enable calculation of data checksums in the wal redo process and
when we extract FPI during WAL injestion. Compute node (Postgres) will
verify checksum of every page after receiving it back from pageserver.
So it is pretty similar to how vanilla Postgres checks them.

There are two other places where we should verify checksums to
detect data corruption earlier:
- when we receive WAL records from safekeepers (already implemented,
  see: WalStreamDecoder::poll_decode)
- when we write layer files to disk and read back in memory from local
  disk or S3
2022-07-07 19:44:47 +02:00
Konstantin Knizhnik
2501afba6e Calculate postgres checksum for FPI stored in pageserver (neondatabase/cloud#536) 2022-07-07 19:44:47 +02:00
21 changed files with 323 additions and 29 deletions

View File

@@ -427,6 +427,7 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
data_checksums_enabled: Some(true),
})
.send()?
.error_from_body()?
@@ -436,7 +437,7 @@ impl PageServerNode {
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
"Failed to parse tenant creation response as tenant id: {}",
id
)
})

View File

@@ -9,6 +9,7 @@
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
use utils::pg_checksum_page::pg_checksum_page;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@@ -56,3 +57,55 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}
/// Calculate page checksum and stamp it onto the page.
/// NB: this will zero out and ignore any existing checksum.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
let checksum = pg_checksum_page(page, blkno);
page[8..10].copy_from_slice(&checksum.to_le_bytes());
}
/// Check if page checksum is valid.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
let checksum = pg_checksum_page(page, blkno);
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use crate::pg_constants::BLCKSZ;
use crate::{page_set_checksum, page_verify_checksum};
use utils::pg_checksum_page::pg_checksum_page;
#[test]
fn set_and_verify_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ as usize] = [0; BLCKSZ as usize];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ as usize) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Sanity check: random bytes in the checksum attribute should not be
// a valid checksum.
assert_ne!(
checksum,
u16::from_le_bytes(page[8..10].try_into().unwrap())
);
// Set the actual checksum.
unsafe { page_set_checksum(&mut page, 0) };
// Verify the checksum.
assert!(unsafe { page_verify_checksum(&page[..], 0) });
// Checksum is not valid with another block number.
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
}
}

View File

@@ -14,7 +14,6 @@ use super::XLogLongPageHeaderData;
use super::XLogPageHeaderData;
use super::XLogRecord;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
use thiserror::Error;
@@ -198,18 +197,12 @@ impl WalStreamDecoder {
}
// We now have a record in the 'recordbuf' local variable.
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
}
})?;
let xlogrec = XLogRecord::from_buf(&recordbuf).map_err(|e| WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
})?;
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
if !wal_record_verify_checksum(&xlogrec, &recordbuf) {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,

View File

@@ -477,6 +477,10 @@ impl XLogRecord {
XLogRecord::des(buf)
}
pub fn from_buf(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
XLogRecord::from_slice(&buf[0..XLOG_SIZE_OF_XLOG_RECORD])
}
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;
XLogRecord::des_from(&mut buf.reader())
@@ -742,3 +746,11 @@ mod tests {
assert_eq!(checkpoint.nextXid.value, 2048);
}
}
pub fn wal_record_verify_checksum(rec: &XLogRecord, recordbuf: &Bytes) -> bool {
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
crc == rec.xl_crc
}

View File

@@ -56,6 +56,7 @@ impl Conf {
.new_pg_command("initdb")?
.arg("-D")
.arg(self.datadir.as_os_str())
.arg("--data-checksums")
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);

View File

@@ -1,6 +1,6 @@
#![allow(unused)]
use criterion::{criterion_group, criterion_main, Criterion};
use utils::pg_checksum_page::pg_checksum_page;
use utils::zid;
pub fn bench_zid_stringify(c: &mut Criterion) {
@@ -18,5 +18,20 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
});
}
criterion_group!(benches, bench_zid_stringify);
// NB: adding `black_box` around arguments doesn't seem to change anything.
pub fn pg_checksum_page_basic(c: &mut Criterion) {
const BLCKSZ: usize = 8192;
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
c.bench_function("pg_checksum_page_basic", |b| {
b.iter(|| {
unsafe { pg_checksum_page(&page[..], 0) };
})
});
}
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
criterion_main!(benches);

View File

@@ -5,7 +5,7 @@ DATA_DIR=$3
PORT=$4
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
rm -fr $DATA_DIR
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --data-checksums --sysid=$SYSID
echo port=$PORT >> $DATA_DIR/postgresql.conf
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
declare -i WAL_SIZE=$REDO_POS+114

View File

@@ -54,6 +54,9 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// Postgres checksum calculation
pub mod pg_checksum_page;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -0,0 +1,136 @@
///
/// Rust implementation of Postgres pg_checksum_page
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
/// for additional comments.
///
/// This is not a direct port of pg_checksum_page from Postgres, though.
/// For example, in the current state it can only produce a valid result
/// on the little-endian platform and with the standard 8 KB page size.
///
const BLCKSZ: usize = 8192;
const N_SUMS: usize = 32;
// Prime multiplier of FNV-1a hash
const FNV_PRIME: u32 = 16777619;
// Base offsets to initialize each of the parallel FNV hashes into a
// different initial state.
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
0xE58F764B, 0x187636BC, 0x5D7B3BB1, 0xE73DE7DE, 0x92BEC979, 0xCCA6C0B2, 0x304A0979, 0x85AA43D4,
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
];
// Calculate one round of the checksum.
fn checksum_comp(checksum: u32, value: u32) -> u32 {
let tmp = checksum ^ value;
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
}
/// Compute the checksum for a Postgres page.
///
/// The page must be adequately aligned (at least on a 4-byte boundary).
///
/// The checksum includes the block number (to detect the case where a page is
/// somehow moved to a different location), the page header (excluding the
/// checksum itself), and the page data.
///
/// As in C implementation in Postgres, the checksum attribute on the page is
/// excluded from the calculation and preserved.
///
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
/// then it's worth looking into.
///
/// # Arguments
/// * `data` - the page to checksum
/// * `blkno` - the block number of the page
///
/// # Safety
/// This function is safe to call only if:
/// * `data` is strictly a standard 8 KB Postgres page
/// * it's called on the little-endian platform
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
let page = std::mem::transmute::<&[u8], &[u32]>(data);
let mut checksum: u32 = 0;
let mut sums = CHECKSUM_BASE_OFFSETS;
// Calculate the checksum of the first 'row' of the page. Do it separately as
// we do an expensive comparison here, which is not required for the rest of the
// page. Putting it into the main loop slows it down ~3 times.
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
// Third 32-bit chunk of the page contains the checksum in the lower half
// (assuming we are on little-endian machine), which we need to zero out.
// See also `PageHeaderData` for reference.
let chunk: u32 = if j == 2 {
page[j] & 0xFFFF_0000
} else {
page[j]
};
*sum = checksum_comp(*sum, chunk);
}
// Main checksum calculation loop
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
}
}
// Finally, add in two rounds of zeroes for additional mixing
for _i in 0..2 {
for s in sums.iter_mut().take(N_SUMS) {
*s = checksum_comp(*s, 0);
}
}
// Xor fold partial checksums together
for sum in sums {
checksum ^= sum;
}
// Mix in the block number to detect transposed pages
checksum ^= blkno;
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
// one. That avoids checksums of zero, which seems like a good idea.
((checksum % 65535) + 1) as u16
}
#[cfg(test)]
mod tests {
use super::{pg_checksum_page, BLCKSZ};
#[test]
fn page_with_and_without_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Zero the checksum attribute on the page.
page[8..10].copy_from_slice(&[0u8; 2]);
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Set the correct checksum into the page.
page[8..10].copy_from_slice(&checksum.to_le_bytes());
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Check that we protect from the page transposition, i.e. page is the
// same, but in the wrong place.
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
assert_ne!(checksum, wrong_blockno_checksum);
}
}

View File

@@ -38,6 +38,7 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
#[serde_as]

View File

@@ -494,6 +494,8 @@ components:
type: string
compaction_threshold:
type: string
data_checksums_enabled:
type: boolean
TenantConfigInfo:
type: object
properties:

View File

@@ -412,6 +412,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
// Turn on data checksums for all new tenants
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);

View File

@@ -473,6 +473,7 @@ pub mod repo_harness {
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
}
}
}

View File

@@ -38,6 +38,10 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
// Turn off data checksums by default to do not affect old tenants.
// We turn it on explicitly for all new tenants.
pub const DEFAULT_DATA_CHECKSUMS: bool = false;
}
/// Per-tenant configuration options
@@ -83,6 +87,7 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub data_checksums_enabled: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -105,6 +110,7 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
impl TenantConfOpt {
@@ -135,6 +141,9 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
data_checksums_enabled: self
.data_checksums_enabled
.unwrap_or(global_conf.data_checksums_enabled),
}
}
@@ -172,6 +181,9 @@ impl TenantConfOpt {
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
self.data_checksums_enabled = Some(data_checksums_enabled);
}
}
}
@@ -199,6 +211,7 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
}
}
@@ -229,6 +242,7 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
}
}
}

View File

@@ -11,7 +11,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{tenant_config, thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
@@ -266,7 +266,14 @@ pub fn create_tenant_repository(
Ok(None)
}
Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
conf,
data_checksums_enabled,
tenant_id,
));
let repo = timelines::create_repo(
conf,
tenant_conf,
@@ -567,10 +574,16 @@ fn load_local_repo(
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<RepositoryImpl>> {
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
@@ -588,8 +601,6 @@ fn load_local_repo(
}
});
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))

View File

@@ -253,6 +253,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
.args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser])
.args(&["-E", "utf8"])
.arg("--data-checksums")
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it

View File

@@ -24,7 +24,7 @@
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{page_is_new, page_set_lsn};
use postgres_ffi::{page_is_new, page_set_checksum, page_set_lsn};
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
@@ -313,6 +313,8 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
unsafe { page_set_checksum(&mut image, blk.blkno) };
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
} else {

View File

@@ -48,7 +48,8 @@ use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::wal_record_verify_checksum;
use postgres_ffi::{page_verify_checksum, pg_constants, XLogRecord};
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -131,6 +132,7 @@ lazy_static! {
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
data_checksums_enabled: bool,
process: Mutex<Option<PostgresRedoProcess>>,
}
@@ -229,11 +231,16 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
data_checksums_enabled: bool,
tenantid: ZTenantId,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenantid,
conf,
data_checksums_enabled,
process: Mutex::new(None),
}
}
@@ -268,7 +275,13 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.apply_wal_records(
buf_tag,
base_img,
records,
wal_redo_timeout,
self.data_checksums_enabled,
)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
@@ -619,6 +632,7 @@ impl PostgresRedoProcess {
info!("running initdb in {:?}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("--data-checksums")
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
@@ -716,6 +730,7 @@ impl PostgresRedoProcess {
base_img: Option<Bytes>,
records: &[(Lsn, ZenithWalRecord)],
wal_redo_timeout: Duration,
data_checksums_enabled: bool,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
@@ -725,6 +740,15 @@ impl PostgresRedoProcess {
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
// Checksums could be not stamped for old tenants, so check them only if they
// are enabled (this is controlled by per-tenant config).
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
));
}
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
@@ -733,6 +757,27 @@ impl PostgresRedoProcess {
rec: postgres_rec,
} = rec
{
let xlogrec = XLogRecord::from_buf(postgres_rec).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"could not deserialize WAL record for relation {} at LSN {}: {}",
tag.rel, lsn, e
),
)
})?;
// WAL records always have a checksum, check it before sending to redo process.
// It doesn't do these checks itself.
if !wal_record_verify_checksum(&xlogrec, postgres_rec) {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"WAL record for relation {} at LSN {} is invalid",
tag.rel, lsn
),
));
}
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(

View File

@@ -37,7 +37,7 @@ You can run all the tests with:
If you want to run all the tests in a particular file:
`./scripts/pytest test_pgbench.py`
`./scripts/pytest test_runner/batch_others/test_restart_compute.py`
If you want to run all tests that have the string "bench" in their names:

View File

@@ -682,7 +682,7 @@ class ProposerPostgres(PgProtocol):
def initdb(self):
""" Run initdb """
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()]
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path(), "--data-checksums"]
self.pg_bin.run(args)
def start(self):