page server: Minor code cleanup

This commit is contained in:
Heikki Linnakangas
2021-03-18 09:39:46 +02:00
committed by Stas Kelvich
parent 939281101e
commit bc4b2787f4
4 changed files with 82 additions and 60 deletions

View File

@@ -1,3 +1,7 @@
//
// Main entry point for the Page Server executable
//
use std::thread;
mod page_cache;
@@ -21,6 +25,9 @@ fn main() -> Result<(), Error> {
});
threads.push(walreceiver_thread);
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let page_server_thread = thread::spawn(|| {
// thread code
page_service::thread_main();

View File

@@ -1,6 +1,12 @@
//
// Page Cache holds all the different all the different page versions and WAL records
//
//
//
use std::collections::BTreeMap;
use std::sync::Mutex;
use std::error::Error;
use std::sync::Mutex;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
@@ -57,56 +63,6 @@ lazy_static! {
// Public interface functions
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn test_get_page_at_lsn()
{
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
let mut tag: Option<BufferTag> = None;
{
let pagecache = PAGECACHE.lock().unwrap();
if pagecache.is_empty() {
println!("page cache is empty");
return;
}
// Find nth entry in the map, where
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
}
i +=1;
}
}
println!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
},
Err(error) => {
println!("GetPage@LSN failed: {}", error);
}
}
}
//
// GetPage@LSN
//
@@ -180,7 +136,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
}
//
// Add WAL record
// Adds a WAL record to the page cache
//
#[allow(dead_code)]
#[allow(unused_variables)]
@@ -198,3 +154,53 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
let oldentry = pagecache.insert(key, entry);
assert!(oldentry.is_none());
}
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn test_get_page_at_lsn()
{
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
let mut tag: Option<BufferTag> = None;
{
let pagecache = PAGECACHE.lock().unwrap();
if pagecache.is_empty() {
println!("page cache is empty");
return;
}
// Find nth entry in the map, where n is picked at random
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
}
i += 1;
}
}
println!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
},
Err(error) => {
println!("GetPage@LSN failed: {}", error);
}
}
}

View File

@@ -1,3 +1,7 @@
//
// The Page Service listens for client connections and serves their GetPage@LSN requests
//
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::runtime;
@@ -113,6 +117,7 @@ impl FeMessage {
pub fn thread_main() {
// Create a new thread pool
let runtime = runtime::Runtime::new().unwrap();
let listen_address = "127.0.0.1:5430";

View File

@@ -1,3 +1,11 @@
//
// WAL receiver
//
// The WAL receiver connects to the WAL safekeeper service, and streams WAL.
// For each WAL record, it decodes the record to figure out which data blocks
// the record affects, and adds the records to the page cache.
//
use tokio_stream::StreamExt;
use tokio::runtime;
use tokio::time::{sleep, Duration};
@@ -12,16 +20,14 @@ use postgres_protocol::message::backend::ReplicationMessage;
//
// This is the entry point for the WAL receiver thread.
//
// TODO: if the connection is lost, reconnect.
//
pub fn thread_main() {
println!("Starting WAL receiver");
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(); // FIXME don't unwrap
println!("Starting WAL receiver");
.unwrap();
runtime.block_on( async {
loop {
@@ -34,7 +40,7 @@ pub fn thread_main() {
});
}
pub async fn walreceiver_main() -> Result<(), Error> {
async fn walreceiver_main() -> Result<(), Error> {
// Connect to the database in replication mode.
println!("connecting...");
@@ -53,8 +59,6 @@ pub async fn walreceiver_main() -> Result<(), Error> {
let identify_system = rclient.identify_system().await?;
println!("identify_system");
//
// Start streaming the WAL.
//