From 0298a6dad6d8015f7021c2b25f802505ff6df982 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 19 Mar 2021 19:55:17 +0300 Subject: [PATCH] smgr <-> page service --- src/page_cache.rs | 65 +++++++++++++++++++++-- src/page_service.rs | 122 ++++++++++++++++++++++++++++++++++---------- src/walreceiver.rs | 4 +- src/walredo.rs | 6 +-- 4 files changed, 159 insertions(+), 38 deletions(-) diff --git a/src/page_cache.rs b/src/page_cache.rs index 560ad53e86..9b6e90d1f8 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -4,7 +4,7 @@ // // -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::error::Error; use std::sync::Mutex; use bytes::Bytes; @@ -13,12 +13,20 @@ use rand::Rng; use crate::walredo; +#[derive(Eq, PartialEq, Hash, Clone, Copy)] +pub struct RelTag { + pub spcnode: u32, + pub dbnode: u32, + pub relnode: u32, + pub forknum: u8, +} + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub struct BufferTag { pub spcnode: u32, pub dbnode: u32, pub relnode: u32, - pub forknum: u32, + pub forknum: u8, pub blknum: u32, } @@ -37,6 +45,13 @@ struct PageCacheShared { // The actual page cache pagecache: BTreeMap, + // Relation n_blocks cache + // + // This hashtable should be updated together with the pagecache. Now it is + // accessed unreasonably often through the smgr_nblocks(). It is better to just + // cache it in postgres smgr and ask only on restart. + relsize_cache: HashMap, + // What page versions do we hold in the cache? If we get GetPage with // LSN < first_valid_lsn, that's an error because we (no longer) hold that // page version. If we get a request > last_valid_lsn, we need to wait until @@ -50,6 +65,7 @@ lazy_static! { static ref PAGECACHE: Mutex = Mutex::new( PageCacheShared { pagecache: BTreeMap::new(), + relsize_cache: HashMap::new(), first_valid_lsn: 0, last_valid_lsn: 0, }); @@ -155,7 +171,9 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result } else if base_img.is_some() { page_img = base_img.unwrap(); } else { - return Err("could not find page image")?; + let zero_page = vec![0 as u8; 8192]; + page_img = Bytes::from(zero_page); + /* return Err("could not find page image")?; */ } return Ok(page_img); @@ -174,9 +192,20 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord) let entry = CacheEntry::WALRecord(rec); let mut shared = PAGECACHE.lock().unwrap(); - let pagecache = &mut shared.pagecache; + // let pagecache = &mut shared.pagecache; - let oldentry = pagecache.insert(key, entry); + let rel_tag = RelTag { + spcnode: tag.spcnode, + dbnode: tag.dbnode, + relnode: tag.relnode, + forknum: tag.forknum, + }; + let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0); + if tag.blknum >= *rel_entry { + *rel_entry = tag.blknum + 1; + } + + let oldentry = shared.pagecache.insert(key, entry); assert!(oldentry.is_none()); } @@ -262,3 +291,29 @@ pub fn test_get_page_at_lsn() } } } + +pub fn relsize_inc(rel: &RelTag, to: Option) +{ + let mut shared = PAGECACHE.lock().unwrap(); + let entry = shared.relsize_cache.entry(*rel).or_insert(0); + + if let Some(to) = to { + if to >= *entry { + *entry = to + 1; + } + } +} + +pub fn relsize_get(rel: &RelTag) -> u32 +{ + let mut shared = PAGECACHE.lock().unwrap(); + let entry = shared.relsize_cache.entry(*rel).or_insert(0); + *entry +} + +pub fn relsize_exist(rel: &RelTag) -> bool +{ + let shared = PAGECACHE.lock().unwrap(); + let relsize_cache = &shared.relsize_cache; + relsize_cache.contains_key(rel) +} diff --git a/src/page_service.rs b/src/page_service.rs index 57449c3710..8c57bacfef 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -15,10 +15,13 @@ use tokio::runtime; use tokio::task; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, Bytes, BytesMut}; -use std::io::{self}; +use std::io; + +use crate::page_cache; type Result = std::result::Result; +#[derive(Debug)] enum FeMessage { StartupMessage(FeStartupMessage), Query(FeQueryMessage), @@ -36,6 +39,7 @@ enum FeMessage { ZenithExtendRequest(ZenithRequest), } +#[derive(Debug)] enum BeMessage { AuthenticationOk, ReadyForQuery, @@ -53,23 +57,23 @@ enum BeMessage { #[derive(Debug)] struct ZenithRequest { - spc_node: i32, - db_node: i32, - rel_node: i32, + spcnode: u32, + dbnode: u32, + relnode: u32, forknum: u8, - blkno: i32, + blkno: u32, } #[derive(Debug)] struct ZenithStatusResponse { ok: bool, - n_blocks: i32, + n_blocks: u32, } #[derive(Debug)] struct ZenithReadResponse { ok: bool, - n_blocks: i32, + n_blocks: u32, page: Bytes } @@ -165,11 +169,11 @@ impl FeMessage { b'd' => { let smgr_tag = body.get_u8(); let zreq = ZenithRequest { - spc_node: body.get_i32(), - db_node: body.get_i32(), - rel_node: body.get_i32(), + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), forknum: body.get_u8(), - blkno: body.get_i32(), + blkno: body.get_u32(), }; // TODO: consider using protobuf or serde bincode for less error prone @@ -329,19 +333,28 @@ impl Connection { self.stream.write_buf(&mut b).await?; } - BeMessage::ZenithStatusResponse(resp) | + BeMessage::ZenithStatusResponse(resp) => { + self.stream.write_u8(b'd').await?; + self.stream.write_u32(4 + 1 + 1 + 4).await?; + self.stream.write_u8(100).await?; /* tag from pagestore_client.h */ + self.stream.write_u8(resp.ok as u8).await?; + self.stream.write_u32(resp.n_blocks).await?; + } + BeMessage::ZenithNblocksResponse(resp) => { self.stream.write_u8(b'd').await?; - self.stream.write_i32(4 + 1 + 4).await?; + self.stream.write_u32(4 + 1 + 1 + 4).await?; + self.stream.write_u8(101).await?; /* tag from pagestore_client.h */ self.stream.write_u8(resp.ok as u8).await?; - self.stream.write_i32(resp.n_blocks).await?; + self.stream.write_u32(resp.n_blocks).await?; } BeMessage::ZenithReadResponse(resp) => { self.stream.write_u8(b'd').await?; - self.stream.write_i32(4 + 1 + 4 + resp.page.len() as i32).await?; + self.stream.write_u32(4 + 1 + 1 + 4 + resp.page.len() as u32).await?; + self.stream.write_u8(102).await?; /* tag from pagestore_client.h */ self.stream.write_u8(resp.ok as u8).await?; - self.stream.write_i32(resp.n_blocks).await?; + self.stream.write_u32(resp.n_blocks).await?; self.stream.write_buf(&mut resp.page.clone()).await?; } } @@ -425,10 +438,24 @@ impl Connection { self.stream.flush().await?; loop { - match self.read_message().await? { - Some(FeMessage::ZenithExistsRequest(_)) => { + let message = self.read_message().await?; + + // println!("query: {:?}", message); + + match message { + Some(FeMessage::ZenithExistsRequest(req)) => { + + let tag = page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + let exist = page_cache::relsize_exist(&tag); + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, + ok: exist, n_blocks: 0 })).await? } @@ -444,27 +471,66 @@ impl Connection { n_blocks: 0 })).await? } - Some(FeMessage::ZenithNblocksRequest(_)) => { + Some(FeMessage::ZenithNblocksRequest(req)) => { + + let tag = page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + let n_blocks = page_cache::relsize_get(&tag); + self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, - n_blocks: 0 + n_blocks: n_blocks })).await? } - Some(FeMessage::ZenithReadRequest(_)) => { - let zero_page = vec![0 as u8; 8192]; - self.write_message(&BeMessage::ZenithReadResponse(ZenithReadResponse { + Some(FeMessage::ZenithReadRequest(req)) => { + let buf_tag = page_cache::BufferTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + blknum: req.blkno + }; + + let inf_lsn = 0xffff_ffff_ffff_eeee; + let msg = BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0, - page: Bytes::from(zero_page), - })).await? + page: page_cache::get_page_at_lsn(buf_tag, inf_lsn).unwrap() + }); + + self.write_message(&msg).await? + } - Some(FeMessage::ZenithCreateRequest(_)) => { + Some(FeMessage::ZenithCreateRequest(req)) => { + let tag = page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + page_cache::relsize_inc(&tag, None); + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: true, n_blocks: 0 })).await? } - Some(FeMessage::ZenithExtendRequest(_)) => { + Some(FeMessage::ZenithExtendRequest(req)) => { + let tag = page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + page_cache::relsize_inc(&tag, Some(req.blkno)); + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: true, n_blocks: 0 diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 56baac12b5..c8af15c64e 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -45,7 +45,7 @@ async fn walreceiver_main() -> Result<(), Error> { // Connect to the database in replication mode. println!("connecting..."); let (mut rclient, connection) = - connect_replication("host=localhost user=postgres", NoTls, ReplicationMode::Physical).await?; + connect_replication("host=localhost user=stas dbname=postgres port=65432", NoTls, ReplicationMode::Physical).await?; println!("connected!"); @@ -111,7 +111,7 @@ async fn walreceiver_main() -> Result<(), Error> { spcnode: blk.rnode_spcnode, dbnode: blk.rnode_dbnode, relnode: blk.rnode_relnode, - forknum: blk.forknum as u32, + forknum: blk.forknum as u8, blknum: blk.blkno }; diff --git a/src/walredo.rs b/src/walredo.rs index a33472c367..c0d58bf56b 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -78,7 +78,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum); + buf.put_u32(tag.forknum as u32); buf.put_u32(tag.blknum); return buf.freeze(); @@ -95,7 +95,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum); + buf.put_u32(tag.forknum as u32); buf.put_u32(tag.blknum); buf.put(base_img); @@ -123,7 +123,7 @@ fn build_get_page_msg(tag: BufferTag, ) -> Bytes { buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum); + buf.put_u32(tag.forknum as u32); buf.put_u32(tag.blknum); return buf.freeze();