smgr <-> page service

This commit is contained in:
Stas Kelvich
2021-03-19 19:55:17 +03:00
parent ab3434f30f
commit 0298a6dad6
4 changed files with 159 additions and 38 deletions

View File

@@ -4,7 +4,7 @@
//
//
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::Mutex;
use bytes::Bytes;
@@ -13,12 +13,20 @@ use rand::Rng;
use crate::walredo;
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u8,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct BufferTag {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u32,
pub forknum: u8,
pub blknum: u32,
}
@@ -37,6 +45,13 @@ struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, CacheEntry>,
// Relation n_blocks cache
//
// This hashtable should be updated together with the pagecache. Now it is
// accessed unreasonably often through the smgr_nblocks(). It is better to just
// cache it in postgres smgr and ask only on restart.
relsize_cache: HashMap<RelTag, u32>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
@@ -50,6 +65,7 @@ lazy_static! {
static ref PAGECACHE: Mutex<PageCacheShared> = Mutex::new(
PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
});
@@ -155,7 +171,9 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
} else if base_img.is_some() {
page_img = base_img.unwrap();
} else {
return Err("could not find page image")?;
let zero_page = vec![0 as u8; 8192];
page_img = Bytes::from(zero_page);
/* return Err("could not find page image")?; */
}
return Ok(page_img);
@@ -174,9 +192,20 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
let entry = CacheEntry::WALRecord(rec);
let mut shared = PAGECACHE.lock().unwrap();
let pagecache = &mut shared.pagecache;
// let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, entry);
let rel_tag = RelTag {
spcnode: tag.spcnode,
dbnode: tag.dbnode,
relnode: tag.relnode,
forknum: tag.forknum,
};
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
if tag.blknum >= *rel_entry {
*rel_entry = tag.blknum + 1;
}
let oldentry = shared.pagecache.insert(key, entry);
assert!(oldentry.is_none());
}
@@ -262,3 +291,29 @@ pub fn test_get_page_at_lsn()
}
}
}
pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
{
let mut shared = PAGECACHE.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to {
if to >= *entry {
*entry = to + 1;
}
}
}
pub fn relsize_get(rel: &RelTag) -> u32
{
let mut shared = PAGECACHE.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(rel: &RelTag) -> bool
{
let shared = PAGECACHE.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}

View File

@@ -15,10 +15,13 @@ use tokio::runtime;
use tokio::task;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, Bytes, BytesMut};
use std::io::{self};
use std::io;
use crate::page_cache;
type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)]
enum FeMessage {
StartupMessage(FeStartupMessage),
Query(FeQueryMessage),
@@ -36,6 +39,7 @@ enum FeMessage {
ZenithExtendRequest(ZenithRequest),
}
#[derive(Debug)]
enum BeMessage {
AuthenticationOk,
ReadyForQuery,
@@ -53,23 +57,23 @@ enum BeMessage {
#[derive(Debug)]
struct ZenithRequest {
spc_node: i32,
db_node: i32,
rel_node: i32,
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: i32,
blkno: u32,
}
#[derive(Debug)]
struct ZenithStatusResponse {
ok: bool,
n_blocks: i32,
n_blocks: u32,
}
#[derive(Debug)]
struct ZenithReadResponse {
ok: bool,
n_blocks: i32,
n_blocks: u32,
page: Bytes
}
@@ -165,11 +169,11 @@ impl FeMessage {
b'd' => {
let smgr_tag = body.get_u8();
let zreq = ZenithRequest {
spc_node: body.get_i32(),
db_node: body.get_i32(),
rel_node: body.get_i32(),
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_i32(),
blkno: body.get_u32(),
};
// TODO: consider using protobuf or serde bincode for less error prone
@@ -329,19 +333,28 @@ impl Connection {
self.stream.write_buf(&mut b).await?;
}
BeMessage::ZenithStatusResponse(resp) |
BeMessage::ZenithStatusResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_u32(4 + 1 + 1 + 4).await?;
self.stream.write_u8(100).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_u32(resp.n_blocks).await?;
}
BeMessage::ZenithNblocksResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_i32(4 + 1 + 4).await?;
self.stream.write_u32(4 + 1 + 1 + 4).await?;
self.stream.write_u8(101).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_i32(resp.n_blocks).await?;
self.stream.write_u32(resp.n_blocks).await?;
}
BeMessage::ZenithReadResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_i32(4 + 1 + 4 + resp.page.len() as i32).await?;
self.stream.write_u32(4 + 1 + 1 + 4 + resp.page.len() as u32).await?;
self.stream.write_u8(102).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_i32(resp.n_blocks).await?;
self.stream.write_u32(resp.n_blocks).await?;
self.stream.write_buf(&mut resp.page.clone()).await?;
}
}
@@ -425,10 +438,24 @@ impl Connection {
self.stream.flush().await?;
loop {
match self.read_message().await? {
Some(FeMessage::ZenithExistsRequest(_)) => {
let message = self.read_message().await?;
// println!("query: {:?}", message);
match message {
Some(FeMessage::ZenithExistsRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
let exist = page_cache::relsize_exist(&tag);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
ok: exist,
n_blocks: 0
})).await?
}
@@ -444,27 +471,66 @@ impl Connection {
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithNblocksRequest(_)) => {
Some(FeMessage::ZenithNblocksRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
let n_blocks = page_cache::relsize_get(&tag);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
n_blocks: n_blocks
})).await?
}
Some(FeMessage::ZenithReadRequest(_)) => {
let zero_page = vec![0 as u8; 8192];
self.write_message(&BeMessage::ZenithReadResponse(ZenithReadResponse {
Some(FeMessage::ZenithReadRequest(req)) => {
let buf_tag = page_cache::BufferTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
blknum: req.blkno
};
let inf_lsn = 0xffff_ffff_ffff_eeee;
let msg = BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: Bytes::from(zero_page),
})).await?
page: page_cache::get_page_at_lsn(buf_tag, inf_lsn).unwrap()
});
self.write_message(&msg).await?
}
Some(FeMessage::ZenithCreateRequest(_)) => {
Some(FeMessage::ZenithCreateRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
page_cache::relsize_inc(&tag, None);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithExtendRequest(_)) => {
Some(FeMessage::ZenithExtendRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
page_cache::relsize_inc(&tag, Some(req.blkno));
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0

View File

@@ -45,7 +45,7 @@ async fn walreceiver_main() -> Result<(), Error> {
// Connect to the database in replication mode.
println!("connecting...");
let (mut rclient, connection) =
connect_replication("host=localhost user=postgres", NoTls, ReplicationMode::Physical).await?;
connect_replication("host=localhost user=stas dbname=postgres port=65432", NoTls, ReplicationMode::Physical).await?;
println!("connected!");
@@ -111,7 +111,7 @@ async fn walreceiver_main() -> Result<(), Error> {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u32,
forknum: blk.forknum as u8,
blknum: blk.blkno
};

View File

@@ -78,7 +78,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
return buf.freeze();
@@ -95,7 +95,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
buf.put(base_img);
@@ -123,7 +123,7 @@ fn build_get_page_msg(tag: BufferTag, ) -> Bytes {
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
return buf.freeze();