mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Add safety notes, benchmark. Optimize checksum calculation
This commit is contained in:
@@ -427,7 +427,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
data_checksums: Some(true),
|
||||
data_checksums_enabled: Some(true),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?
|
||||
|
||||
@@ -60,13 +60,17 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
|
||||
|
||||
/// Calculate page checksum and stamp it onto the page.
|
||||
/// NB: this will zero out and ignore any existing checksum.
|
||||
pub fn page_set_checksum(page: &mut [u8], blkno: u32) {
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
}
|
||||
|
||||
/// Check if page checksum is valid.
|
||||
pub fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
|
||||
}
|
||||
@@ -86,7 +90,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = pg_checksum_page(&page[..], 0);
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Sanity check: random bytes in the checksum attribute should not be
|
||||
// a valid checksum.
|
||||
@@ -96,12 +100,12 @@ mod tests {
|
||||
);
|
||||
|
||||
// Set the actual checksum.
|
||||
page_set_checksum(&mut page, 0);
|
||||
unsafe { page_set_checksum(&mut page, 0) };
|
||||
|
||||
// Verify the checksum.
|
||||
assert!(page_verify_checksum(&page[..], 0));
|
||||
assert!(unsafe { page_verify_checksum(&page[..], 0) });
|
||||
|
||||
// Checksum is not valid with another block number.
|
||||
assert!(!page_verify_checksum(&page[..], 1));
|
||||
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
use utils::zid;
|
||||
|
||||
pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
@@ -18,5 +18,20 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_zid_stringify);
|
||||
// NB: adding `black_box` around arguments doesn't seem to change anything.
|
||||
pub fn pg_checksum_page_basic(c: &mut Criterion) {
|
||||
const BLCKSZ: usize = 8192;
|
||||
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
c.bench_function("pg_checksum_page_basic", |b| {
|
||||
b.iter(|| {
|
||||
unsafe { pg_checksum_page(&page[..], 0) };
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
///
|
||||
/// Port of Postgres pg_checksum_page
|
||||
/// Rust implementation of Postgres pg_checksum_page
|
||||
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
|
||||
/// for additional comments.
|
||||
///
|
||||
/// This is not a direct port of pg_checksum_page from Postgres, though.
|
||||
/// For example, in the current state it can only produce a valid result
|
||||
/// on the little-endian platform and with the standard 8 KB page size.
|
||||
///
|
||||
|
||||
const BLCKSZ: usize = 8192;
|
||||
const N_SUMS: usize = 32;
|
||||
/* prime multiplier of FNV-1a hash */
|
||||
// Prime multiplier of FNV-1a hash
|
||||
const FNV_PRIME: u32 = 16777619;
|
||||
|
||||
/*
|
||||
* Base offsets to initialize each of the parallel FNV hashes into a
|
||||
* different initial state.
|
||||
*/
|
||||
// Base offsets to initialize each of the parallel FNV hashes into a
|
||||
// different initial state.
|
||||
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
|
||||
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
|
||||
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
|
||||
@@ -18,68 +22,80 @@ const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
|
||||
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
|
||||
];
|
||||
|
||||
/*
|
||||
* Calculate one round of the checksum.
|
||||
*/
|
||||
// Calculate one round of the checksum.
|
||||
fn checksum_comp(checksum: u32, value: u32) -> u32 {
|
||||
let tmp = checksum ^ value;
|
||||
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the checksum for a Postgres page.
|
||||
*
|
||||
* The page must be adequately aligned (at least on a 4-byte boundary).
|
||||
* Beware also that the checksum field of the page is transiently zeroed.
|
||||
*
|
||||
* The checksum includes the block number (to detect the case where a page is
|
||||
* somehow moved to a different location), the page header (excluding the
|
||||
* checksum itself), and the page data.
|
||||
*
|
||||
* As in C implementation in Postgres, the checksum attribute on the page is
|
||||
* excluded from the calculation and preserved.
|
||||
*/
|
||||
pub fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
|
||||
let page = unsafe { std::mem::transmute::<&[u8], &[u32]>(data) };
|
||||
/// Compute the checksum for a Postgres page.
|
||||
///
|
||||
/// The page must be adequately aligned (at least on a 4-byte boundary).
|
||||
///
|
||||
/// The checksum includes the block number (to detect the case where a page is
|
||||
/// somehow moved to a different location), the page header (excluding the
|
||||
/// checksum itself), and the page data.
|
||||
///
|
||||
/// As in C implementation in Postgres, the checksum attribute on the page is
|
||||
/// excluded from the calculation and preserved.
|
||||
///
|
||||
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
|
||||
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
|
||||
/// then it's worth looking into.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `data` - the page to checksum
|
||||
/// * `blkno` - the block number of the page
|
||||
///
|
||||
/// # Safety
|
||||
/// This function is safe to call only if:
|
||||
/// * `data` is strictly a standard 8 KB Postgres page
|
||||
/// * it's called on the little-endian platform
|
||||
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
|
||||
let page = std::mem::transmute::<&[u8], &[u32]>(data);
|
||||
let mut checksum: u32 = 0;
|
||||
let mut sums = CHECKSUM_BASE_OFFSETS;
|
||||
|
||||
/* main checksum calculation */
|
||||
for i in 0..(BLCKSZ / (4 * N_SUMS)) {
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
let chunk_i = i * N_SUMS + j;
|
||||
let chunk: u32;
|
||||
if chunk_i == 2 {
|
||||
let mut chunk_copy = page[chunk_i].to_le_bytes();
|
||||
chunk_copy[0] = 0;
|
||||
chunk_copy[1] = 0;
|
||||
chunk = u32::from_le_bytes(chunk_copy);
|
||||
} else {
|
||||
chunk = page[chunk_i];
|
||||
}
|
||||
// Calculate the checksum of the first 'row' of the page. Do it separately as
|
||||
// we do an expensive comparison here, which is not required for the rest of the
|
||||
// page. Putting it into the main loop slows it down ~3 times.
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
// Third 32-bit chunk of the page contains the checksum in the lower half
|
||||
// (assuming we are on little-endian machine), which we need to zero out.
|
||||
// See also `PageHeaderData` for reference.
|
||||
let chunk: u32 = if j == 2 {
|
||||
page[j] & 0xFFFF_0000
|
||||
} else {
|
||||
page[j]
|
||||
};
|
||||
|
||||
*sum = checksum_comp(*sum, chunk);
|
||||
*sum = checksum_comp(*sum, chunk);
|
||||
}
|
||||
|
||||
// Main checksum calculation loop
|
||||
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
|
||||
}
|
||||
}
|
||||
/* finally add in two rounds of zeroes for additional mixing */
|
||||
|
||||
// Finally, add in two rounds of zeroes for additional mixing
|
||||
for _i in 0..2 {
|
||||
for s in sums.iter_mut().take(N_SUMS) {
|
||||
*s = checksum_comp(*s, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/* xor fold partial checksums together */
|
||||
// Xor fold partial checksums together
|
||||
for sum in sums {
|
||||
checksum ^= sum;
|
||||
}
|
||||
|
||||
/* Mix in the block number to detect transposed pages */
|
||||
// Mix in the block number to detect transposed pages
|
||||
checksum ^= blkno;
|
||||
|
||||
/*
|
||||
* Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
|
||||
* one. That avoids checksums of zero, which seems like a good idea.
|
||||
*/
|
||||
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
|
||||
// one. That avoids checksums of zero, which seems like a good idea.
|
||||
((checksum % 65535) + 1) as u16
|
||||
}
|
||||
|
||||
@@ -96,25 +112,25 @@ mod tests {
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = pg_checksum_page(&page[..], 0);
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Zero the checksum attribute on the page.
|
||||
page[8..10].copy_from_slice(&[0u8; 2]);
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = pg_checksum_page(&page[..], 0);
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Set the correct checksum into the page.
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = pg_checksum_page(&page[..], 0);
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Check that we protect from the page transposition, i.e. page is the
|
||||
// same, but in the wrong place.
|
||||
let wrong_blockno_checksum = pg_checksum_page(&page[..], 1);
|
||||
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
|
||||
assert_ne!(checksum, wrong_blockno_checksum);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums: Option<bool>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -494,7 +494,7 @@ components:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
type: string
|
||||
data_checksums:
|
||||
data_checksums_enabled:
|
||||
type: boolean
|
||||
TenantConfigInfo:
|
||||
type: object
|
||||
|
||||
@@ -413,7 +413,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
// Turn on data checksums for all new tenants
|
||||
tenant_conf.data_checksums = Some(request_data.data_checksums.unwrap_or(true));
|
||||
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
|
||||
|
||||
if let Some(compaction_period) = request_data.compaction_period {
|
||||
tenant_conf.compaction_period =
|
||||
|
||||
@@ -473,7 +473,7 @@ pub mod repo_harness {
|
||||
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
data_checksums: Some(tenant_conf.data_checksums),
|
||||
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub data_checksums: bool,
|
||||
pub data_checksums_enabled: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -110,7 +110,7 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums: Option<bool>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -141,7 +141,9 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
data_checksums: self.data_checksums.unwrap_or(global_conf.data_checksums),
|
||||
data_checksums_enabled: self
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(global_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,8 +181,8 @@ impl TenantConfOpt {
|
||||
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
|
||||
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(data_checksums) = other.data_checksums {
|
||||
self.data_checksums = Some(data_checksums);
|
||||
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
|
||||
self.data_checksums_enabled = Some(data_checksums_enabled);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,7 +211,7 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
data_checksums: DEFAULT_DATA_CHECKSUMS,
|
||||
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,7 +242,7 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
data_checksums: defaults::DEFAULT_DATA_CHECKSUMS,
|
||||
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,11 +266,14 @@ pub fn create_tenant_repository(
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
let data_checksums = tenant_conf
|
||||
.data_checksums
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
let wal_redo_manager =
|
||||
Arc::new(PostgresRedoManager::new(conf, data_checksums, tenant_id));
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
|
||||
conf,
|
||||
data_checksums_enabled,
|
||||
tenant_id,
|
||||
));
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
@@ -576,11 +579,11 @@ fn load_local_repo(
|
||||
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
let data_checksums = tenant_conf
|
||||
.data_checksums
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums, tenant_id);
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
|
||||
@@ -313,7 +313,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
page_set_checksum(&mut image, blk.blkno);
|
||||
unsafe { page_set_checksum(&mut image, blk.blkno) };
|
||||
|
||||
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
|
||||
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
|
||||
|
||||
@@ -132,7 +132,7 @@ lazy_static! {
|
||||
pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums: bool,
|
||||
data_checksums_enabled: bool,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
@@ -233,14 +233,14 @@ impl PostgresRedoManager {
|
||||
///
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums: bool,
|
||||
data_checksums_enabled: bool,
|
||||
tenantid: ZTenantId,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
tenantid,
|
||||
conf,
|
||||
data_checksums,
|
||||
data_checksums_enabled,
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -280,7 +280,7 @@ impl PostgresRedoManager {
|
||||
base_img,
|
||||
records,
|
||||
wal_redo_timeout,
|
||||
self.data_checksums,
|
||||
self.data_checksums_enabled,
|
||||
)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
@@ -730,7 +730,7 @@ impl PostgresRedoProcess {
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, ZenithWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
data_checksums: bool,
|
||||
data_checksums_enabled: bool,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
@@ -742,7 +742,7 @@ impl PostgresRedoProcess {
|
||||
if let Some(img) = base_img {
|
||||
// Checksums could be not stamped for old tenants, so check them only if they
|
||||
// are enabled (this is controlled by per-tenant config).
|
||||
if data_checksums && !page_verify_checksum(&img, tag.blknum) {
|
||||
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
|
||||
|
||||
Reference in New Issue
Block a user