mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
[refer #182] Make walkeeper periodically send callme requests to pageserver
This commit is contained in:
committed by
Stas Kelvich
parent
fd20101e5c
commit
e0cc4dee4f
@@ -343,7 +343,7 @@ impl WalAcceptorNode {
|
||||
|
||||
let ps_arg = if self.pass_to_pageserver {
|
||||
// Tell page server it can receive WAL from this WAL safekeeper
|
||||
["--pageserver", "127.0.0.1:64000"].to_vec()
|
||||
["--pageserver", "127.0.0.1:64000", "--recall", "1 second"].to_vec()
|
||||
} else {
|
||||
[].to_vec()
|
||||
};
|
||||
|
||||
@@ -53,6 +53,12 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("recall-period")
|
||||
.long("recall")
|
||||
.takes_value(true)
|
||||
.help("Period for requestion pageserver to call for replication"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("daemonize")
|
||||
.short("d")
|
||||
@@ -80,6 +86,7 @@ fn main() -> Result<()> {
|
||||
pageserver_addr: None,
|
||||
listen_addr: "127.0.0.1:5454".parse()?,
|
||||
ttl: None,
|
||||
recall_period: None,
|
||||
};
|
||||
|
||||
if let Some(dir) = arg_matches.value_of("datadir") {
|
||||
@@ -109,6 +116,10 @@ fn main() -> Result<()> {
|
||||
conf.ttl = Some::<Duration>(parse(ttl)?);
|
||||
}
|
||||
|
||||
if let Some(recall) = arg_matches.value_of("recall") {
|
||||
conf.recall_period = Some::<Duration>(parse(recall)?);
|
||||
}
|
||||
|
||||
start_wal_acceptor(conf)
|
||||
}
|
||||
|
||||
|
||||
@@ -22,4 +22,5 @@ pub struct WalAcceptorConf {
|
||||
pub listen_addr: SocketAddr,
|
||||
pub pageserver_addr: Option<SocketAddr>,
|
||||
pub ttl: Option<Duration>,
|
||||
pub recall_period: Option<Duration>,
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ use std::io::{BufReader, Read, Seek, SeekFrom, Write};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::sleep;
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
@@ -142,6 +144,48 @@ pub struct ReceiveWalConn {
|
||||
pub conf: WalAcceptorConf,
|
||||
}
|
||||
|
||||
///
|
||||
/// Periodically request pageserver to call back.
|
||||
/// If pageserver already has replication channel, it will just ignore this request
|
||||
///
|
||||
fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) {
|
||||
let addr = conf.pageserver_addr.unwrap();
|
||||
let ps_connstr = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
addr.ip(),
|
||||
addr.port(),
|
||||
"no_db",
|
||||
"no_user",
|
||||
);
|
||||
let callme = format!(
|
||||
"callmemaybe {} host={} port={} options='-c ztimelineid={}'",
|
||||
timelineid,
|
||||
conf.listen_addr.ip(),
|
||||
conf.listen_addr.port(),
|
||||
timelineid
|
||||
);
|
||||
loop {
|
||||
info!(
|
||||
"requesting page server to connect to us: start {} {}",
|
||||
ps_connstr, callme
|
||||
);
|
||||
match Client::connect(&ps_connstr, NoTls) {
|
||||
Ok(mut client) => {
|
||||
if let Err(e) = client.simple_query(&callme) {
|
||||
error!("Failed to send callme request to pageserver: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e),
|
||||
}
|
||||
|
||||
if let Some(period) = conf.recall_period {
|
||||
sleep(period);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceiveWalConn {
|
||||
pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result<ReceiveWalConn> {
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
@@ -160,32 +204,6 @@ impl ReceiveWalConn {
|
||||
Ok(T::des_from(&mut self.stream_in)?)
|
||||
}
|
||||
|
||||
fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> {
|
||||
if let Some(addr) = self.conf.pageserver_addr {
|
||||
let ps_connstr = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
addr.ip(),
|
||||
addr.port(),
|
||||
"no_db",
|
||||
"no_user",
|
||||
);
|
||||
let callme = format!(
|
||||
"callmemaybe {} host={} port={} options='-c ztimelineid={}'",
|
||||
self.timeline.get().timelineid,
|
||||
self.conf.listen_addr.ip(),
|
||||
self.conf.listen_addr.port(),
|
||||
self.timeline.get().timelineid
|
||||
);
|
||||
info!(
|
||||
"requesting page server to connect to us: start {} {}",
|
||||
ps_connstr, callme
|
||||
);
|
||||
let mut client = Client::connect(&ps_connstr, NoTls)?;
|
||||
client.simple_query(&callme)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receive WAL from wal_proposer
|
||||
pub fn run(&mut self) -> Result<()> {
|
||||
// Receive information about server
|
||||
@@ -254,12 +272,14 @@ impl ReceiveWalConn {
|
||||
/* Acknowledge the proposed candidate by returning it to the proxy */
|
||||
prop.node_id.ser_into(&mut self.stream_out)?;
|
||||
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
if let Err(e) = self.request_callback() {
|
||||
// Do not treate it as fatal error and continue work
|
||||
// FIXME: we should retry after a while...
|
||||
error!("Failed to send callme request to pageserver: {}", e);
|
||||
if self.conf.pageserver_addr.is_some() {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
let conf = self.conf.clone();
|
||||
let timelineid = self.timeline.get().timelineid;
|
||||
thread::spawn(move || {
|
||||
request_callback(conf, timelineid);
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
|
||||
Reference in New Issue
Block a user