From cdb20bf2a27daf79d416fce9bd06d191536a346e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 16 Mar 2021 11:36:19 +0200 Subject: [PATCH] More work on the page server's WAL receiver Add comments. Decode all records received in one XLogData, not just the first one. --- src/main.rs | 2 +- src/{walreader.rs => waldecoder.rs} | 0 src/walreceiver.rs | 47 ++++++++++++++++++----------- 3 files changed, 31 insertions(+), 18 deletions(-) rename src/{walreader.rs => waldecoder.rs} (100%) diff --git a/src/main.rs b/src/main.rs index 3f4e54cf38..ac27740ffe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::thread; mod page_cache; -mod walreader; +mod waldecoder; mod walreceiver; use std::io::Error; diff --git a/src/walreader.rs b/src/waldecoder.rs similarity index 100% rename from src/walreader.rs rename to src/waldecoder.rs diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 1372a4e4b8..3eb6d68f5f 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -1,17 +1,16 @@ -//use crate::page_cache::*; - -//use postgres_protocol::message::backend::XLogDataBody; - use tokio_stream::StreamExt; use tokio::runtime; -use crate::walreader::WalStreamDecoder; +use crate::waldecoder::WalStreamDecoder; use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode}; use postgres_protocol::message::backend::ReplicationMessage; -//use ReplicationMessage::XLogData; - +// +// This is the entry point for the WAL receiver thread. +// +// TODO: if the connection is lost, reconnect. +// pub fn thread_main() { let runtime = runtime::Builder::new_current_thread() @@ -27,10 +26,9 @@ pub fn thread_main() { } pub async fn walreceiver_main() -> Result<(), Error> { - // Connect to the database. + // Connect to the database in replication mode. println!("connecting..."); - let (mut rclient, connection) = connect_replication("host=localhost user=heikki", NoTls, ReplicationMode::Physical).await?; @@ -48,24 +46,39 @@ pub async fn walreceiver_main() -> Result<(), Error> { println!("identify_system"); + // + // Start streaming the WAL. + // + // TODO: currently, we start streaming at the primary's last insert location. + // We should start at the last LSN that we had streamed previously, instead. + // let mut physical_stream = rclient .start_physical_replication(None, identify_system.xlogpos(), None) .await.unwrap(); - let mut walreader = WalStreamDecoder::new(u64::from(identify_system.xlogpos())); + let mut waldecoder = WalStreamDecoder::new(u64::from(identify_system.xlogpos())); - //let record = walreader.read_record(&mut physical_stream); - while let Some(replication_message) = physical_stream.next().await { match replication_message? { ReplicationMessage::XLogData(xlog_data) => { - println!("received XLogData:"); - walreader.feed_bytes(xlog_data.data()); - let rec = walreader.poll_decode(); - if rec.is_some() { - crate::walreader::decode_wal_record(&rec.unwrap()); + println!("received XLogData"); + + // Pass the WAL data to the decoder, and see if we can decode + // more records as a result. + waldecoder.feed_bytes(xlog_data.data()); + + loop { + let rec = waldecoder.poll_decode(); + + if rec.is_none() { + break; + } + + crate::waldecoder::decode_wal_record(&rec.unwrap()); println!("decoded record"); + + // TODO: Put the WAL record to the page cache } } ReplicationMessage::PrimaryKeepAlive(_keepalive) => {