diff --git a/Cargo.lock b/Cargo.lock index 5da0e9c179..7560ef98e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,7 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.1.9" +version = "0.2.0" dependencies = [ "anyhow", "crc32c", diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index d9027f31f6..2fd17fa063 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -294,7 +294,7 @@ impl PostgresNode { conf.append("max_wal_senders", "10"); // wal_log_hints is mandatory when running against pageserver (see gh issue#192) // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? - conf.append("wal_log_hints", "on"); +// conf.append("wal_log_hints", "on"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); conf.append("shared_buffers", "1MB"); diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3c30d6c578..6a4387a4d1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } yakv = { path = "../../yakv" } -#yakv = "0.1.9" +#yakv = "0.2.0" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 48be565b27..db3c820009 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); pub const DEFAULT_GC_HORIZON: u64 = 1024 * 1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10000); + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index a57a6e6cc4..41edcee164 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -11,7 +11,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Result}; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::relish::*; @@ -416,7 +416,6 @@ pub fn save_decoded_record( if checkpoint.update_next_xid(decoded.xl_xid) { *checkpoint_modified = true; } - // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { @@ -426,14 +425,38 @@ pub fn save_decoded_record( relnode: blk.rnode_relnode, forknum: blk.forknum as u8, }); + if blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); - let rec = WALRecord { - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + // Compression of WAL is not yet supported + assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0); - timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); + image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); + assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; + } else { + let rec = WALRecord { + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + } } let mut buf = decoded.record.clone(); diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index bef7173b42..6c6d0f9ef1 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -9,7 +9,7 @@ use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb -const COMMIT_THRESHOLD: usize = CACHE_SIZE / 4; +const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2; const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb /// @@ -126,7 +126,7 @@ impl ToastStore { Ok(ToastStore { db: Storage::open( &path.join("pageserver.db"), - None, // Some(&path.join("pageserver.log")), + Some(&path.join("pageserver.log")), StorageConfig { cache_size: CACHE_SIZE, checkpoint_interval: CHECKPOINT_INTERVAL, diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index b1e8e3b54f..20e04bcad9 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -229,17 +229,18 @@ pub struct DecodedBkpBlock { pub blkno: u32, /* copy of the fork_flags field from the XLogRecordBlockHeader */ - flags: u8, + pub flags: u8, /* Information on full-page image, if any */ - has_image: bool, /* has image, even for consistency checking */ + pub has_image: bool, /* has image, even for consistency checking */ pub apply_image: bool, /* has image that should be restored */ pub will_init: bool, /* record doesn't need previous page version to apply */ //char *bkp_image; - hole_offset: u16, - hole_length: u16, - bimg_len: u16, - bimg_info: u8, + pub hole_offset: u16, + pub hole_length: u16, + pub bimg_offset: u32, + pub bimg_len: u16, + pub bimg_info: u8, /* Buffer holding the rmgr-specific data associated with this block */ has_data: bool, @@ -859,8 +860,19 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } // 3. Decode blocks. + let mut ptr = record.len() - buf.remaining(); + for blk in blocks.iter_mut() { + if blk.has_image { + blk.bimg_offset = ptr as u32; + ptr += blk.bimg_len as usize; + } + if blk.has_data { + ptr += blk.data_len as usize; + } + } // We don't need them, so just skip blocks_total_len bytes buf.advance(blocks_total_len as usize); + assert_eq!(ptr, record.len() - buf.remaining()); let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;