Support truncate WAL record

This commit is contained in:
Konstantin Knizhnik
2021-04-15 15:50:47 +03:00
parent d9bc2109bb
commit 24b925d528
8 changed files with 169 additions and 52 deletions

View File

@@ -338,6 +338,7 @@ impl ComputeControlPlane<'_> {
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
max_parallel_workers = 0\n\
wal_sender_timeout = 0\n\
listen_addresses = '{address}'\n\
port = {port}\n\
@@ -396,6 +397,7 @@ impl ComputeControlPlane<'_> {
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
max_parallel_workers = 0\n\
listen_addresses = '{address}'\n\
port = {port}\n\
computenode_mode = true\n\

View File

@@ -222,10 +222,11 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard {
tui::init_logging()
} else if conf.daemonize {
let log = conf.data_dir.join("pageserver.log");
let log_file = OpenOptions::new()
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(log).unwrap_or_else(|_| panic!("Could not create log file"));
.open(log)
.unwrap_or_else(|_| panic!("Could not create log file"));
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog::Filter::new(drain, |record: &slog::Record| {

View File

@@ -368,13 +368,14 @@ impl PageCache {
shared = wait_result.0;
if wait_result.1.timed_out() {
error!(
"Timed out while waiting for WAL record at LSN {} to arrive",
error!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
);
);
return Err(format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32, lsn & 0xffff_ffff
lsn >> 32,
lsn & 0xffff_ffff
))?;
}
}
@@ -383,8 +384,11 @@ impl PageCache {
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {:X}/{:X} has already been removed",
lsn >> 32, lsn & 0xffff_ffff))?;
return Err(format!(
"LSN {:X}/{:X} has already been removed",
lsn >> 32,
lsn & 0xffff_ffff
))?;
}
}
let mut buf = BytesMut::new();
@@ -439,7 +443,7 @@ impl PageCache {
self.put_page_image(tag, lsn, page_img.clone());
} else {
// No base image, and no WAL record. Huh?
panic!("no page image or WAL record for requested page");
panic!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
@@ -554,6 +558,41 @@ impl PageCache {
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Adds a relation-wide WAL record (like truncate) to the page cache,
// associating it with all pages started with specified block number
//
pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let mut key = CacheKey { tag, lsn: rec.lsn };
let old_rel_size = self.relsize_get(&tag.rel);
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
// set new relation size
self.shared
.lock()
.unwrap()
.relsize_cache
.insert(tag.rel, tag.blknum);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
}
//
// Memorize a full image of a page version
//
@@ -584,14 +623,18 @@ impl PageCache {
// Can't move backwards.
//assert!(lsn >= shared.last_valid_lsn);
if lsn > shared.last_valid_lsn {
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();
if lsn > shared.last_valid_lsn {
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
trace!("lsn={}, shared.last_valid_lsn={}", lsn, shared.last_valid_lsn);
}
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
trace!(
"lsn={}, shared.last_valid_lsn={}",
lsn,
shared.last_valid_lsn
);
}
}
//

View File

@@ -234,7 +234,7 @@ pub fn thread_main(conf: PageServerConf) {
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
socket.set_nodelay(true).unwrap();
let mut conn_handler = Connection::new(conf.clone(), socket);
task::spawn(async move {
@@ -318,7 +318,7 @@ impl Connection {
.await?;
self.stream.write_i16(1).await?;
self.stream.write_buf(&mut b).await?;
self.stream.write_all(&mut b).await?;
self.stream.write_i32(0).await?; /* table oid */
self.stream.write_i16(0).await?; /* attnum */
self.stream.write_i32(25).await?; /* TEXTOID */
@@ -337,7 +337,7 @@ impl Connection {
self.stream.write_i16(1).await?;
self.stream.write_i32(b.len() as i32).await?;
self.stream.write_buf(&mut b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::ControlFile => {
@@ -349,7 +349,7 @@ impl Connection {
self.stream.write_i16(1).await?;
self.stream.write_i32(b.len() as i32).await?;
self.stream.write_buf(&mut b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::CommandComplete => {
@@ -357,7 +357,7 @@ impl Connection {
self.stream.write_u8(b'C').await?;
self.stream.write_i32(4 + b.len() as i32).await?;
self.stream.write_buf(&mut b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::ZenithStatusResponse(resp) => {
@@ -384,7 +384,7 @@ impl Connection {
self.stream.write_u8(102).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_u32(resp.n_blocks).await?;
self.stream.write_buf(&mut resp.page.clone()).await?;
self.stream.write_all(&mut resp.page.clone()).await?;
}
}
@@ -405,7 +405,7 @@ impl Connection {
match m.kind {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
let mut b = Bytes::from("N");
self.stream.write_buf(&mut b).await?;
self.stream.write_all(&mut b).await?;
self.stream.flush().await?;
}
StartupRequestCode::Normal => {
@@ -508,11 +508,11 @@ impl Connection {
loop {
let message = self.read_message().await?;
/*
if let Some(m) = &message {
trace!("query({}): {:?}", sysid, m);
};
*/
/*
if let Some(m) = &message {
trace!("query({}): {:?}", sysid, m);
};
*/
if message.is_none() {
// connection was closed
return Ok(());

View File

@@ -287,15 +287,22 @@ pub struct DecodedBkpBlock {
const SizeOfXLogRecord: u32 = 24;
pub struct DecodedWALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub lsn: u64, // LSN at the *end* of the record
pub xl_info: u8,
pub xl_rmid: u8,
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>,
}
// From pg_control.h and rmgrlist.h
const XLOG_SWITCH: u8 = 0x40;
const RM_XLOG_ID: u8 = 0;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
// Is this record an XLOG_SWITCH record? They need some special processing,
// so we need to check for that before the rest of the parsing.
@@ -316,25 +323,61 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID;
}
pub type Oid = u32;
pub type BlockNumber = u32;
pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
#[repr(C)]
#[derive(Debug)]
pub struct RelFileNode {
pub spcnode: Oid, /* tablespace */
pub dbnode: Oid, /* database */
pub relnode: Oid, /* relation */
}
#[repr(C)]
#[derive(Debug)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
pub flags: u32,
}
pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance(SizeOfXLogRecord as usize);
XlSmgrTruncate {
blkno: buf.get_u32_le(),
rnode: RelFileNode {
spcnode: buf.get_u32_le(), /* tablespace */
dbnode: buf.get_u32_le(), /* database */
relnode: buf.get_u32_le(), /* relation */
},
flags: buf.get_u32_le(),
}
}
//
// Routines to decode a WAL record and figure out which blocks are modified
//
pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(lsn: u64, record: Bytes) -> DecodedWALRecord {
trace!(
"decoding record with LSN {:08X}/{:08X} ({} bytes)",
lsn >> 32,
lsn & 0xffff_ffff,
rec.remaining()
record.remaining()
);
let mut buf = rec.clone();
let mut buf = record.clone();
// FIXME: assume little-endian here
let xl_tot_len = buf.get_u32_le();
let _xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let _xl_info = buf.get_u8();
let _xl_rmid = buf.get_u8();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
@@ -582,8 +625,10 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
// Since we don't care about the data payloads here, we're done.
return DecodedWALRecord {
lsn: lsn,
record: rec,
blocks: blocks,
lsn,
xl_info,
xl_rmid,
record,
blocks,
};
}

View File

@@ -13,7 +13,7 @@ use tokio_stream::StreamExt;
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::waldecoder::WalStreamDecoder;
use crate::waldecoder::*;
use crate::PageServerConf;
use postgres_protocol::message::backend::ReplicationMessage;
@@ -158,7 +158,29 @@ async fn walreceiver_main(
pcache.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == RM_SMGR_ID
&& (decoded.xl_info & XLR_RMGR_INFO_MASK) == XLOG_SMGR_TRUNCATE
{
let truncate = decode_truncate_record(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
let tag = BufferTag {
rel: RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: MAIN_FORKNUM,
},
blknum: truncate.blkno,
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: false,
rec: recdata.clone(),
};
pcache.put_rel_wal_record(tag, rec);
}
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_valid_lsn(lsn);

View File

@@ -67,7 +67,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
let _guard = runtime.enter();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
}
info!("WAL redo postgres started");
info!("WAL redo postgres started");
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
@@ -79,7 +79,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// On error, kill the process.
error!("Kill wal redo process on error");
error!("Kill wal redo process on error");
break;
}
}
@@ -102,7 +102,7 @@ fn handle_apply_request(
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
assert!(entry.apply_pending);
assert!(entry.apply_pending);
entry.apply_pending = false;
let nrecords = records.len();
@@ -161,14 +161,18 @@ impl WalRedoProcess {
.expect("failed to execute initdb");
if !initdb.status.success() {
panic!("initdb failed: {}\nstderr:\n{}",
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap());
panic!(
"initdb failed: {}\nstderr:\n{}",
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap()
);
} else {
// Limit shared cache for wal-redo-postres
let mut config = OpenOptions::new().append(true).open(datadir.join("postgresql.conf"))?;
config.write(b"shared_buffers=128kB\n")?;
}
// Limit shared cache for wal-redo-postres
let mut config = OpenOptions::new()
.append(true)
.open(datadir.join("postgresql.conf"))?;
config.write(b"shared_buffers=128kB\n")?;
}
// Start postgres itself
let mut child = Command::new("postgres")
.arg("--wal-redo")