Save materialized pages

This commit is contained in:
Konstantin Knizhnik
2021-11-17 12:16:02 +03:00
parent ee29446edc
commit 6311135d73
7 changed files with 55 additions and 20 deletions

2
Cargo.lock generated
View File

@@ -2571,7 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.1.9"
version = "0.2.0"
dependencies = [
"anyhow",
"crc32c",

View File

@@ -294,7 +294,7 @@ impl PostgresNode {
conf.append("max_wal_senders", "10");
// wal_log_hints is mandatory when running against pageserver (see gh issue#192)
// TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE?
conf.append("wal_log_hints", "on");
// conf.append("wal_log_hints", "on");
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
yakv = { path = "../../yakv" }
#yakv = "0.1.9"
#yakv = "0.2.0"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000);
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;

View File

@@ -11,7 +11,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::relish::*;
@@ -416,7 +416,6 @@ pub fn save_decoded_record(
if checkpoint.update_next_xid(decoded.xl_xid) {
*checkpoint_modified = true;
}
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
@@ -426,14 +425,38 @@ pub fn save_decoded_record(
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
});
if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
image.extend_from_slice(&recdata[img_offs..img_offs + img_len]);
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
// Compression of WAL is not yet supported
assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0);
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?;
} else {
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
}
let mut buf = decoded.record.clone();

View File

@@ -9,7 +9,7 @@ use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 4;
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2;
const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb
///
@@ -126,7 +126,7 @@ impl ToastStore {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
None, // Some(&path.join("pageserver.log")),
Some(&path.join("pageserver.log")),
StorageConfig {
cache_size: CACHE_SIZE,
checkpoint_interval: CHECKPOINT_INTERVAL,

View File

@@ -229,17 +229,18 @@ pub struct DecodedBkpBlock {
pub blkno: u32,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
flags: u8,
pub flags: u8,
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool, /* record doesn't need previous page version to apply */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
bimg_len: u16,
bimg_info: u8,
pub hole_offset: u16,
pub hole_length: u16,
pub bimg_offset: u32,
pub bimg_len: u16,
pub bimg_info: u8,
/* Buffer holding the rmgr-specific data associated with this block */
has_data: bool,
@@ -859,8 +860,19 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
}
if blk.has_data {
ptr += blk.data_len as usize;
}
}
// We don't need them, so just skip blocks_total_len bytes
buf.advance(blocks_total_len as usize);
assert_eq!(ptr, record.len() - buf.remaining());
let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;