From 52ee3a2bac2477a0b9eaad9ae338ff8ece5b932e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 23 Apr 2021 17:03:56 +0300 Subject: [PATCH] Support CREATE DATABASE command --- pageserver/src/bin/pageserver.rs | 2 + pageserver/src/page_cache.rs | 106 ++++++++++++++++++++++--------- pageserver/src/waldecoder.rs | 88 +++++++++++++++---------- pageserver/src/walreceiver.rs | 14 +++- vendor/postgres | 2 +- 5 files changed, 147 insertions(+), 65 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 035667d62d..5a0c7ab502 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -229,11 +229,13 @@ fn init_logging(conf: &PageServerConf) -> Result anyhow::Result<()> { + async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { + let mut lsn = req_lsn; + //When invalid LSN is requested, it means "don't wait, return latest version of the page" + //This is necessary for bootstrap. + if lsn == 0 { + lsn = self.last_valid_lsn.load(Ordering::Acquire); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + } self.seqwait_lsn .wait_for_timeout(lsn, TIMEOUT) .await @@ -506,7 +518,7 @@ impl PageCache { ) })?; - Ok(()) + Ok(lsn) } // @@ -517,22 +529,7 @@ impl PageCache { pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - let mut lsn = req_lsn; - //When invalid LSN is requested, it means "don't wait, return latest version of the page" - //This is necessary for bootstrap. - if lsn == 0 - { - lsn = self.last_valid_lsn.load(Ordering::Acquire); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), - lsn - ); - } - else - { - self.wait_lsn(lsn).await?; - } + let lsn = self.wait_lsn(req_lsn).await?; // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. @@ -555,6 +552,7 @@ impl PageCache { if entry_opt.is_none() { static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); return Ok(Bytes::from_static(&ZERO_PAGE)); /* return Err("could not find page image")?; */ } @@ -578,7 +576,7 @@ impl PageCache { // FIXME: assumes little-endian. Only used for the debugging log though let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - trace!( + debug!( "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo, @@ -738,7 +736,6 @@ impl PageCache { // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { - shared.last_valid_lsn = lsn; self.seqwait_lsn.advance(lsn); @@ -809,9 +806,10 @@ impl PageCache { shared.last_record_lsn } - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + pub async fn relsize_get(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + let mut lsn = req_lsn; if lsn != u64::MAX { - self.wait_lsn(lsn).await?; + lsn = self.wait_lsn(lsn).await?; } let mut key = CacheKey { @@ -848,18 +846,18 @@ impl PageCache { } } let relsize = tag.blknum + 1; - trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize); + debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); return Ok(relsize); } } break; } - trace!("Size of relation {:?} at {} is zero", rel, lsn); + debug!("Size of relation {:?} at {} is zero", rel, lsn); Ok(0) } - pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - self.wait_lsn(lsn).await?; + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + let lsn = self.wait_lsn(req_lsn).await?; let key = CacheKey { tag: BufferTag { @@ -879,11 +877,11 @@ impl PageCache { buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); if tag.rel == *rel { - trace!("Relation {:?} exists at {}", rel, lsn); + debug!("Relation {:?} exists at {}", rel, lsn); return Ok(true); } } - trace!("Relation {:?} doesn't exist at {}", rel, lsn); + debug!("Relation {:?} doesn't exist at {}", rel, lsn); Ok(false) } @@ -898,6 +896,56 @@ impl PageCache { last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), } } + + pub fn create_database( + &self, + lsn: u64, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: 0, + }; + key.pack(&mut buf); + let iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Forward, + )); + let mut n = 0; + for (k, v) in iter { + buf.clear(); + buf.extend_from_slice(&k); + let mut key = CacheKey::unpack(&mut buf); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + buf.clear(); + key.pack(&mut buf); + + self.db.put(&buf[..], v)?; + n += 1; + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } } pub fn get_stats() -> PageCacheStats { diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 4ba1a373fe..6cfd446a7f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -2,8 +2,8 @@ use crate::pg_constants; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::cmp::min; -use thiserror::Error; use std::str; +use thiserror::Error; const XLOG_BLCKSZ: u32 = 8192; @@ -378,17 +378,41 @@ pub struct XlSmgrTruncate { pub flags: u32, } -pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate { - let mut buf = decoded.record.clone(); - buf.advance((SizeOfXLogRecord + 2) as usize); - XlSmgrTruncate { - blkno: buf.get_u32_le(), - rnode: RelFileNode { - spcnode: buf.get_u32_le(), /* tablespace */ - dbnode: buf.get_u32_le(), /* database */ - relnode: buf.get_u32_le(), /* relation */ - }, - flags: buf.get_u32_le(), +impl XlSmgrTruncate { + pub fn decode(decoded: &DecodedWALRecord) -> XlSmgrTruncate { + let mut buf = decoded.record.clone(); + buf.advance((SizeOfXLogRecord + 2) as usize); + XlSmgrTruncate { + blkno: buf.get_u32_le(), + rnode: RelFileNode { + spcnode: buf.get_u32_le(), /* tablespace */ + dbnode: buf.get_u32_le(), /* database */ + relnode: buf.get_u32_le(), /* relation */ + }, + flags: buf.get_u32_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlCreateDatabase { + pub db_id: Oid, + pub tablespace_id: Oid, + pub src_db_id: Oid, + pub src_tablespace_id: Oid, +} + +impl XlCreateDatabase { + pub fn decode(decoded: &DecodedWALRecord) -> XlCreateDatabase { + let mut buf = decoded.record.clone(); + buf.advance((SizeOfXLogRecord + 2) as usize); + XlCreateDatabase { + db_id: buf.get_u32_le(), + tablespace_id: buf.get_u32_le(), + src_db_id: buf.get_u32_le(), + src_tablespace_id: buf.get_u32_le(), + } } } @@ -678,38 +702,34 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blocks.push(blk); //TODO parse abort record to extract subtrans entries } - } - else if xl_rmid == pg_constants::RM_DBASE_ID - { + } else if xl_rmid == pg_constants::RM_DBASE_ID { let info = xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::XLOG_DBASE_CREATE - { + if info == pg_constants::XLOG_DBASE_CREATE { //buf points to main_data - let db_id = buf.get_u32_le(); - let tablespace_id = buf.get_u32_le(); - let src_db_id = buf.get_u32_le(); - let src_tablespace_id = buf.get_u32_le(); - trace!("XLOG_DBASE_CREATE db_id {} src_db_id {}", db_id, src_db_id); + let db_id = buf.get_u32_le(); + let tablespace_id = buf.get_u32_le(); + let src_db_id = buf.get_u32_le(); + let src_tablespace_id = buf.get_u32_le(); + trace!( + "XLOG_DBASE_CREATE tablespace_id/db_id {}/{} src_db_id {}/{}", + tablespace_id, + db_id, + src_tablespace_id, + src_db_id + ); // in postgres it is implemented as copydir // we need to copy all pages in page_cache - } - else - { + } else { trace!("XLOG_DBASE_DROP is not handled yet"); } - } - else if xl_rmid == pg_constants::RM_TBLSPC_ID - { + } else if xl_rmid == pg_constants::RM_TBLSPC_ID { let info = xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::XLOG_TBLSPC_CREATE - { + if info == pg_constants::XLOG_TBLSPC_CREATE { //buf points to main_data - let ts_id = buf.get_u32_le(); + let ts_id = buf.get_u32_le(); let ts_path = str::from_utf8(&buf).unwrap(); trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path); - } - else - { + } else { trace!("XLOG_TBLSPC_DROP is not handled yet"); } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 20d6551630..65c543c64e 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -248,7 +248,7 @@ async fn walreceiver_main( && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = decode_truncate_record(&decoded); + let truncate = XlSmgrTruncate::decode(&decoded); if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { let tag = BufferTag { rel: RelTag { @@ -268,6 +268,18 @@ async fn walreceiver_main( }; pcache.put_rel_wal_record(tag, rec).await?; } + } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_DBASE_CREATE + { + let createdb = XlCreateDatabase::decode(&decoded); + pcache.create_database( + lsn, + createdb.db_id, + createdb.tablespace_id, + createdb.src_db_id, + createdb.src_tablespace_id, + )?; } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN diff --git a/vendor/postgres b/vendor/postgres index af9c507616..cf2e6d1904 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit af9c50761691c9e6890eecc0396ba85177958ba8 +Subproject commit cf2e6d1904f9bd4c9fd23d9162744a1e7f3a963c