From 24b925d528df65656193e4c63c51909af1b0117e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 15 Apr 2021 15:50:47 +0300 Subject: [PATCH] Support truncate WAL record --- integration_tests/tests/control_plane/mod.rs | 2 + pageserver/src/bin/pageserver.rs | 5 +- pageserver/src/page_cache.rs | 71 ++++++++++++++++---- pageserver/src/page_service.rs | 24 +++---- pageserver/src/waldecoder.rs | 67 +++++++++++++++--- pageserver/src/walreceiver.rs | 26 ++++++- pageserver/src/walredo.rs | 24 ++++--- vendor/postgres | 2 +- 8 files changed, 169 insertions(+), 52 deletions(-) diff --git a/integration_tests/tests/control_plane/mod.rs b/integration_tests/tests/control_plane/mod.rs index 00ba0c52f1..844c9839cc 100644 --- a/integration_tests/tests/control_plane/mod.rs +++ b/integration_tests/tests/control_plane/mod.rs @@ -338,6 +338,7 @@ impl ComputeControlPlane<'_> { shared_buffers = 1MB\n\ max_connections = 100\n\ wal_level = replica\n\ + max_parallel_workers = 0\n\ wal_sender_timeout = 0\n\ listen_addresses = '{address}'\n\ port = {port}\n\ @@ -396,6 +397,7 @@ impl ComputeControlPlane<'_> { shared_buffers = 1MB\n\ max_connections = 100\n\ wal_level = replica\n\ + max_parallel_workers = 0\n\ listen_addresses = '{address}'\n\ port = {port}\n\ computenode_mode = true\n\ diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index b1ff34afc4..bb920d46de 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -222,10 +222,11 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard { tui::init_logging() } else if conf.daemonize { let log = conf.data_dir.join("pageserver.log"); - let log_file = OpenOptions::new() + let log_file = OpenOptions::new() .create(true) .append(true) - .open(log).unwrap_or_else(|_| panic!("Could not create log file")); + .open(log) + .unwrap_or_else(|_| panic!("Could not create log file")); let decorator = slog_term::PlainSyncDecorator::new(log_file); let drain = slog_term::CompactFormat::new(decorator).build(); let drain = slog::Filter::new(drain, |record: &slog::Record| { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8c3a9eecf7..5bd24e4e22 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -368,13 +368,14 @@ impl PageCache { shared = wait_result.0; if wait_result.1.timed_out() { - error!( - "Timed out while waiting for WAL record at LSN {} to arrive", + error!( + "Timed out while waiting for WAL record at LSN {} to arrive", lsn - ); + ); return Err(format!( "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, lsn & 0xffff_ffff + lsn >> 32, + lsn & 0xffff_ffff ))?; } } @@ -383,8 +384,11 @@ impl PageCache { } if lsn < shared.first_valid_lsn { - return Err(format!("LSN {:X}/{:X} has already been removed", - lsn >> 32, lsn & 0xffff_ffff))?; + return Err(format!( + "LSN {:X}/{:X} has already been removed", + lsn >> 32, + lsn & 0xffff_ffff + ))?; } } let mut buf = BytesMut::new(); @@ -439,7 +443,7 @@ impl PageCache { self.put_page_image(tag, lsn, page_img.clone()); } else { // No base image, and no WAL record. Huh? - panic!("no page image or WAL record for requested page"); + panic!("no page image or WAL record for requested page"); } // FIXME: assumes little-endian. Only used for the debugging log though @@ -554,6 +558,41 @@ impl PageCache { self.num_wal_records.fetch_add(1, Ordering::Relaxed); } + // + // Adds a relation-wide WAL record (like truncate) to the page cache, + // associating it with all pages started with specified block number + // + pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) { + let mut key = CacheKey { tag, lsn: rec.lsn }; + let old_rel_size = self.relsize_get(&tag.rel); + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + // set new relation size + self.shared + .lock() + .unwrap() + .relsize_cache + .insert(tag.rel, tag.blknum); + + let mut key_buf = BytesMut::new(); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + for blknum in tag.blknum..old_rel_size { + key_buf.clear(); + key.tag.blknum = blknum; + key.pack(&mut key_buf); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + } + let n = (old_rel_size - tag.blknum) as u64; + self.num_entries.fetch_add(n, Ordering::Relaxed); + self.num_wal_records.fetch_add(n, Ordering::Relaxed); + } + // // Memorize a full image of a page version // @@ -584,14 +623,18 @@ impl PageCache { // Can't move backwards. //assert!(lsn >= shared.last_valid_lsn); - if lsn > shared.last_valid_lsn { - shared.last_valid_lsn = lsn; - self.valid_lsn_condvar.notify_all(); + if lsn > shared.last_valid_lsn { + shared.last_valid_lsn = lsn; + self.valid_lsn_condvar.notify_all(); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - } else { - trace!("lsn={}, shared.last_valid_lsn={}", lsn, shared.last_valid_lsn); - } + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } else { + trace!( + "lsn={}, shared.last_valid_lsn={}", + lsn, + shared.last_valid_lsn + ); + } } // diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b5267688a1..b23c65e44a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -234,7 +234,7 @@ pub fn thread_main(conf: PageServerConf) { loop { let (socket, peer_addr) = listener.accept().await.unwrap(); debug!("accepted connection from {}", peer_addr); - socket.set_nodelay(true).unwrap(); + socket.set_nodelay(true).unwrap(); let mut conn_handler = Connection::new(conf.clone(), socket); task::spawn(async move { @@ -318,7 +318,7 @@ impl Connection { .await?; self.stream.write_i16(1).await?; - self.stream.write_buf(&mut b).await?; + self.stream.write_all(&mut b).await?; self.stream.write_i32(0).await?; /* table oid */ self.stream.write_i16(0).await?; /* attnum */ self.stream.write_i32(25).await?; /* TEXTOID */ @@ -337,7 +337,7 @@ impl Connection { self.stream.write_i16(1).await?; self.stream.write_i32(b.len() as i32).await?; - self.stream.write_buf(&mut b).await?; + self.stream.write_all(&mut b).await?; } BeMessage::ControlFile => { @@ -349,7 +349,7 @@ impl Connection { self.stream.write_i16(1).await?; self.stream.write_i32(b.len() as i32).await?; - self.stream.write_buf(&mut b).await?; + self.stream.write_all(&mut b).await?; } BeMessage::CommandComplete => { @@ -357,7 +357,7 @@ impl Connection { self.stream.write_u8(b'C').await?; self.stream.write_i32(4 + b.len() as i32).await?; - self.stream.write_buf(&mut b).await?; + self.stream.write_all(&mut b).await?; } BeMessage::ZenithStatusResponse(resp) => { @@ -384,7 +384,7 @@ impl Connection { self.stream.write_u8(102).await?; /* tag from pagestore_client.h */ self.stream.write_u8(resp.ok as u8).await?; self.stream.write_u32(resp.n_blocks).await?; - self.stream.write_buf(&mut resp.page.clone()).await?; + self.stream.write_all(&mut resp.page.clone()).await?; } } @@ -405,7 +405,7 @@ impl Connection { match m.kind { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { let mut b = Bytes::from("N"); - self.stream.write_buf(&mut b).await?; + self.stream.write_all(&mut b).await?; self.stream.flush().await?; } StartupRequestCode::Normal => { @@ -508,11 +508,11 @@ impl Connection { loop { let message = self.read_message().await?; -/* - if let Some(m) = &message { - trace!("query({}): {:?}", sysid, m); - }; -*/ + /* + if let Some(m) = &message { + trace!("query({}): {:?}", sysid, m); + }; + */ if message.is_none() { // connection was closed return Ok(()); diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 1f1a5dfc99..43ce634970 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -287,15 +287,22 @@ pub struct DecodedBkpBlock { const SizeOfXLogRecord: u32 = 24; pub struct DecodedWALRecord { - pub lsn: u64, // LSN at the *end* of the record + pub lsn: u64, // LSN at the *end* of the record + pub xl_info: u8, + pub xl_rmid: u8, pub record: Bytes, // raw XLogRecord pub blocks: Vec, } // From pg_control.h and rmgrlist.h -const XLOG_SWITCH: u8 = 0x40; -const RM_XLOG_ID: u8 = 0; +pub const XLOG_SWITCH: u8 = 0x40; +pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; + +pub const RM_XLOG_ID: u8 = 0; +pub const RM_XACT_ID: u8 = 1; +pub const RM_SMGR_ID: u8 = 2; // Is this record an XLOG_SWITCH record? They need some special processing, // so we need to check for that before the rest of the parsing. @@ -316,25 +323,61 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool { return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID; } +pub type Oid = u32; +pub type BlockNumber = u32; + +pub const MAIN_FORKNUM: u8 = 0; +pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; + +#[repr(C)] +#[derive(Debug)] +pub struct RelFileNode { + pub spcnode: Oid, /* tablespace */ + pub dbnode: Oid, /* database */ + pub relnode: Oid, /* relation */ +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlSmgrTruncate { + pub blkno: BlockNumber, + pub rnode: RelFileNode, + pub flags: u32, +} + +pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate { + let mut buf = decoded.record.clone(); + buf.advance(SizeOfXLogRecord as usize); + XlSmgrTruncate { + blkno: buf.get_u32_le(), + rnode: RelFileNode { + spcnode: buf.get_u32_le(), /* tablespace */ + dbnode: buf.get_u32_le(), /* database */ + relnode: buf.get_u32_le(), /* relation */ + }, + flags: buf.get_u32_le(), + } +} + // // Routines to decode a WAL record and figure out which blocks are modified // -pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(lsn: u64, record: Bytes) -> DecodedWALRecord { trace!( "decoding record with LSN {:08X}/{:08X} ({} bytes)", lsn >> 32, lsn & 0xffff_ffff, - rec.remaining() + record.remaining() ); - let mut buf = rec.clone(); + let mut buf = record.clone(); // FIXME: assume little-endian here let xl_tot_len = buf.get_u32_le(); let _xl_xid = buf.get_u32_le(); let _xl_prev = buf.get_u64_le(); - let _xl_info = buf.get_u8(); - let _xl_rmid = buf.get_u8(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); @@ -582,8 +625,10 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { // Since we don't care about the data payloads here, we're done. return DecodedWALRecord { - lsn: lsn, - record: rec, - blocks: blocks, + lsn, + xl_info, + xl_rmid, + record, + blocks, }; } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index f41c9274b2..c8a5fa612c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt; use crate::page_cache; use crate::page_cache::{BufferTag, RelTag}; -use crate::waldecoder::WalStreamDecoder; +use crate::waldecoder::*; use crate::PageServerConf; use postgres_protocol::message::backend::ReplicationMessage; @@ -158,7 +158,29 @@ async fn walreceiver_main( pcache.put_wal_record(tag, rec); } - + // include truncate wal record in all pages + if decoded.xl_rmid == RM_SMGR_ID + && (decoded.xl_info & XLR_RMGR_INFO_MASK) == XLOG_SMGR_TRUNCATE + { + let truncate = decode_truncate_record(&decoded); + if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { + let tag = BufferTag { + rel: RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: MAIN_FORKNUM, + }, + blknum: truncate.blkno, + }; + let rec = page_cache::WALRecord { + lsn: lsn, + will_init: false, + rec: recdata.clone(), + }; + pcache.put_rel_wal_record(tag, rec); + } + } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN pcache.advance_last_valid_lsn(lsn); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9fb676d109..5a1489757d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -67,7 +67,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { let _guard = runtime.enter(); process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); } - info!("WAL redo postgres started"); + info!("WAL redo postgres started"); // Pretty arbitrarily, reuse the same Postgres process for 100 requests. // After that, kill it and start a new one. This is mostly to avoid @@ -79,7 +79,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { let result = handle_apply_request(&pcache, &process, &runtime, request); if result.is_err() { // On error, kill the process. - error!("Kill wal redo process on error"); + error!("Kill wal redo process on error"); break; } } @@ -102,7 +102,7 @@ fn handle_apply_request( let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); let mut entry = entry_rc.content.lock().unwrap(); - assert!(entry.apply_pending); + assert!(entry.apply_pending); entry.apply_pending = false; let nrecords = records.len(); @@ -161,14 +161,18 @@ impl WalRedoProcess { .expect("failed to execute initdb"); if !initdb.status.success() { - panic!("initdb failed: {}\nstderr:\n{}", - std::str::from_utf8(&initdb.stdout).unwrap(), - std::str::from_utf8(&initdb.stderr).unwrap()); + panic!( + "initdb failed: {}\nstderr:\n{}", + std::str::from_utf8(&initdb.stdout).unwrap(), + std::str::from_utf8(&initdb.stderr).unwrap() + ); } else { - // Limit shared cache for wal-redo-postres - let mut config = OpenOptions::new().append(true).open(datadir.join("postgresql.conf"))?; - config.write(b"shared_buffers=128kB\n")?; - } + // Limit shared cache for wal-redo-postres + let mut config = OpenOptions::new() + .append(true) + .open(datadir.join("postgresql.conf"))?; + config.write(b"shared_buffers=128kB\n")?; + } // Start postgres itself let mut child = Command::new("postgres") .arg("--wal-redo") diff --git a/vendor/postgres b/vendor/postgres index 90d4144e38..2d0b8458eb 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 90d4144e386302bae43eae9f332bad42dcb1c631 +Subproject commit 2d0b8458eb53fc45547eb889ec61bee0a9f078dc