More work on the page server's WAL receiver

Add comments. Decode all records received in one XLogData, not just the
first one.
This commit is contained in:
Heikki Linnakangas
2021-03-16 11:36:19 +02:00
committed by Stas Kelvich
parent eac04d7f10
commit cdb20bf2a2
3 changed files with 31 additions and 18 deletions

View File

@@ -1,7 +1,7 @@
use std::thread;
mod page_cache;
mod walreader;
mod waldecoder;
mod walreceiver;
use std::io::Error;

View File

@@ -1,17 +1,16 @@
//use crate::page_cache::*;
//use postgres_protocol::message::backend::XLogDataBody;
use tokio_stream::StreamExt;
use tokio::runtime;
use crate::walreader::WalStreamDecoder;
use crate::waldecoder::WalStreamDecoder;
use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode};
use postgres_protocol::message::backend::ReplicationMessage;
//use ReplicationMessage::XLogData;
//
// This is the entry point for the WAL receiver thread.
//
// TODO: if the connection is lost, reconnect.
//
pub fn thread_main() {
let runtime = runtime::Builder::new_current_thread()
@@ -27,10 +26,9 @@ pub fn thread_main() {
}
pub async fn walreceiver_main() -> Result<(), Error> {
// Connect to the database.
// Connect to the database in replication mode.
println!("connecting...");
let (mut rclient, connection) =
connect_replication("host=localhost user=heikki", NoTls, ReplicationMode::Physical).await?;
@@ -48,24 +46,39 @@ pub async fn walreceiver_main() -> Result<(), Error> {
println!("identify_system");
//
// Start streaming the WAL.
//
// TODO: currently, we start streaming at the primary's last insert location.
// We should start at the last LSN that we had streamed previously, instead.
//
let mut physical_stream = rclient
.start_physical_replication(None, identify_system.xlogpos(), None)
.await.unwrap();
let mut walreader = WalStreamDecoder::new(u64::from(identify_system.xlogpos()));
let mut waldecoder = WalStreamDecoder::new(u64::from(identify_system.xlogpos()));
//let record = walreader.read_record(&mut physical_stream);
while let Some(replication_message) = physical_stream.next().await {
match replication_message? {
ReplicationMessage::XLogData(xlog_data) => {
println!("received XLogData:");
walreader.feed_bytes(xlog_data.data());
let rec = walreader.poll_decode();
if rec.is_some() {
crate::walreader::decode_wal_record(&rec.unwrap());
println!("received XLogData");
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
waldecoder.feed_bytes(xlog_data.data());
loop {
let rec = waldecoder.poll_decode();
if rec.is_none() {
break;
}
crate::waldecoder::decode_wal_record(&rec.unwrap());
println!("decoded record");
// TODO: Put the WAL record to the page cache
}
}
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {