diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index c8e41b3058..7b56cc091f 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -343,7 +343,7 @@ impl WalAcceptorNode { let ps_arg = if self.pass_to_pageserver { // Tell page server it can receive WAL from this WAL safekeeper - ["--pageserver", "127.0.0.1:64000"].to_vec() + ["--pageserver", "127.0.0.1:64000", "--recall", "1 second"].to_vec() } else { [].to_vec() }; diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index fed8f7746f..35ac5031de 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -53,6 +53,12 @@ fn main() -> Result<()> { .takes_value(true) .help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"), ) + .arg( + Arg::with_name("recall-period") + .long("recall") + .takes_value(true) + .help("Period for requestion pageserver to call for replication"), + ) .arg( Arg::with_name("daemonize") .short("d") @@ -80,6 +86,7 @@ fn main() -> Result<()> { pageserver_addr: None, listen_addr: "127.0.0.1:5454".parse()?, ttl: None, + recall_period: None, }; if let Some(dir) = arg_matches.value_of("datadir") { @@ -109,6 +116,10 @@ fn main() -> Result<()> { conf.ttl = Some::(parse(ttl)?); } + if let Some(recall) = arg_matches.value_of("recall") { + conf.recall_period = Some::(parse(recall)?); + } + start_wal_acceptor(conf) } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index e296d8ec82..6926fcf95a 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -22,4 +22,5 @@ pub struct WalAcceptorConf { pub listen_addr: SocketAddr, pub pageserver_addr: Option, pub ttl: Option, + pub recall_period: Option, } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 8f22c44a55..a9e75f040b 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -12,6 +12,8 @@ use std::io::{BufReader, Read, Seek, SeekFrom, Write}; use std::net::{SocketAddr, TcpStream}; use std::str; use std::sync::Arc; +use std::thread; +use std::thread::sleep; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; @@ -142,6 +144,48 @@ pub struct ReceiveWalConn { pub conf: WalAcceptorConf, } +/// +/// Periodically request pageserver to call back. +/// If pageserver already has replication channel, it will just ignore this request +/// +fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { + let addr = conf.pageserver_addr.unwrap(); + let ps_connstr = format!( + "host={} port={} dbname={} user={}", + addr.ip(), + addr.port(), + "no_db", + "no_user", + ); + let callme = format!( + "callmemaybe {} host={} port={} options='-c ztimelineid={}'", + timelineid, + conf.listen_addr.ip(), + conf.listen_addr.port(), + timelineid + ); + loop { + info!( + "requesting page server to connect to us: start {} {}", + ps_connstr, callme + ); + match Client::connect(&ps_connstr, NoTls) { + Ok(mut client) => { + if let Err(e) = client.simple_query(&callme) { + error!("Failed to send callme request to pageserver: {}", e); + } + } + Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e), + } + + if let Some(period) = conf.recall_period { + sleep(period); + } else { + break; + } + } +} + impl ReceiveWalConn { pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { let peer_addr = socket.peer_addr()?; @@ -160,32 +204,6 @@ impl ReceiveWalConn { Ok(T::des_from(&mut self.stream_in)?) } - fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> { - if let Some(addr) = self.conf.pageserver_addr { - let ps_connstr = format!( - "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), - "no_db", - "no_user", - ); - let callme = format!( - "callmemaybe {} host={} port={} options='-c ztimelineid={}'", - self.timeline.get().timelineid, - self.conf.listen_addr.ip(), - self.conf.listen_addr.port(), - self.timeline.get().timelineid - ); - info!( - "requesting page server to connect to us: start {} {}", - ps_connstr, callme - ); - let mut client = Client::connect(&ps_connstr, NoTls)?; - client.simple_query(&callme)?; - } - Ok(()) - } - /// Receive WAL from wal_proposer pub fn run(&mut self) -> Result<()> { // Receive information about server @@ -254,12 +272,14 @@ impl ReceiveWalConn { /* Acknowledge the proposed candidate by returning it to the proxy */ prop.node_id.ser_into(&mut self.stream_out)?; - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - if let Err(e) = self.request_callback() { - // Do not treate it as fatal error and continue work - // FIXME: we should retry after a while... - error!("Failed to send callme request to pageserver: {}", e); + if self.conf.pageserver_addr.is_some() { + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + let conf = self.conf.clone(); + let timelineid = self.timeline.get().timelineid; + thread::spawn(move || { + request_callback(conf, timelineid); + }); } info!(