diff --git a/src/main.rs b/src/main.rs index 8bf7ac9b38..1fa7cc828f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,7 @@ +// +// Main entry point for the Page Server executable +// + use std::thread; mod page_cache; @@ -21,6 +25,9 @@ fn main() -> Result<(), Error> { }); threads.push(walreceiver_thread); + // GetPage@LSN requests are served by another thread. (It uses async I/O, + // but the code in page_service sets up it own thread pool for that) + let page_server_thread = thread::spawn(|| { // thread code page_service::thread_main(); diff --git a/src/page_cache.rs b/src/page_cache.rs index 63f24fa107..daebade530 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -1,6 +1,12 @@ +// +// Page Cache holds all the different all the different page versions and WAL records +// +// +// + use std::collections::BTreeMap; -use std::sync::Mutex; use std::error::Error; +use std::sync::Mutex; use bytes::Bytes; use lazy_static::lazy_static; use rand::Rng; @@ -57,56 +63,6 @@ lazy_static! { // Public interface functions - -// -// Simple test function for the WAL redo code: -// -// 1. Pick a page from the page cache at random. -// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) -// -// -pub fn test_get_page_at_lsn() -{ - // for quick testing of the get_page_at_lsn() funcion. - // - // Get a random page from the page cache. Apply all its WAL, by requesting - // that page at the highest lsn. - - let mut tag: Option = None; - - { - let pagecache = PAGECACHE.lock().unwrap(); - - if pagecache.is_empty() { - println!("page cache is empty"); - return; - } - - // Find nth entry in the map, where - let n = rand::thread_rng().gen_range(0..pagecache.len()); - let mut i = 0; - for (key, _e) in pagecache.iter() { - if i == n { - tag = Some(key.tag); - break; - } - i +=1; - } - } - - println!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { - Ok(_img) => { - // This prints out the whole page image. - //println!("{:X?}", img); - }, - Err(error) => { - println!("GetPage@LSN failed: {}", error); - } - } -} - - // // GetPage@LSN // @@ -180,7 +136,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result } // -// Add WAL record +// Adds a WAL record to the page cache // #[allow(dead_code)] #[allow(unused_variables)] @@ -198,3 +154,53 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord) let oldentry = pagecache.insert(key, entry); assert!(oldentry.is_none()); } + + + +// +// Simple test function for the WAL redo code: +// +// 1. Pick a page from the page cache at random. +// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) +// +// +pub fn test_get_page_at_lsn() +{ + // for quick testing of the get_page_at_lsn() funcion. + // + // Get a random page from the page cache. Apply all its WAL, by requesting + // that page at the highest lsn. + + let mut tag: Option = None; + + { + let pagecache = PAGECACHE.lock().unwrap(); + + if pagecache.is_empty() { + println!("page cache is empty"); + return; + } + + // Find nth entry in the map, where n is picked at random + let n = rand::thread_rng().gen_range(0..pagecache.len()); + let mut i = 0; + for (key, _e) in pagecache.iter() { + if i == n { + tag = Some(key.tag); + break; + } + i += 1; + } + } + + println!("testing GetPage@LSN for block {}", tag.unwrap().blknum); + match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { + Ok(_img) => { + // This prints out the whole page image. + //println!("{:X?}", img); + }, + Err(error) => { + println!("GetPage@LSN failed: {}", error); + } + } +} diff --git a/src/page_service.rs b/src/page_service.rs index 886b9e6cfb..1dc8a8eed3 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -1,3 +1,7 @@ +// +// The Page Service listens for client connections and serves their GetPage@LSN requests +// + use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::runtime; @@ -113,6 +117,7 @@ impl FeMessage { pub fn thread_main() { + // Create a new thread pool let runtime = runtime::Runtime::new().unwrap(); let listen_address = "127.0.0.1:5430"; diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 9d44090229..08cbd7f2a7 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -1,3 +1,11 @@ +// +// WAL receiver +// +// The WAL receiver connects to the WAL safekeeper service, and streams WAL. +// For each WAL record, it decodes the record to figure out which data blocks +// the record affects, and adds the records to the page cache. +// + use tokio_stream::StreamExt; use tokio::runtime; use tokio::time::{sleep, Duration}; @@ -12,16 +20,14 @@ use postgres_protocol::message::backend::ReplicationMessage; // // This is the entry point for the WAL receiver thread. // -// TODO: if the connection is lost, reconnect. -// pub fn thread_main() { + println!("Starting WAL receiver"); + let runtime = runtime::Builder::new_current_thread() .enable_all() .build() - .unwrap(); // FIXME don't unwrap - - println!("Starting WAL receiver"); + .unwrap(); runtime.block_on( async { loop { @@ -34,7 +40,7 @@ pub fn thread_main() { }); } -pub async fn walreceiver_main() -> Result<(), Error> { +async fn walreceiver_main() -> Result<(), Error> { // Connect to the database in replication mode. println!("connecting..."); @@ -53,8 +59,6 @@ pub async fn walreceiver_main() -> Result<(), Error> { let identify_system = rclient.identify_system().await?; - println!("identify_system"); - // // Start streaming the WAL. //