From 3fea78d688bba05c585561d9559f7f51bceaabdf Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 7 Apr 2021 13:43:40 +0300 Subject: [PATCH] Multitenant wal_acceptor --- walkeeper/src/bin/wal_acceptor.rs | 14 ++- walkeeper/src/lib.rs | 1 + walkeeper/src/pq_protocol.rs | 23 ++++ walkeeper/src/wal_service.rs | 193 ++++++++++++++++++++---------- 4 files changed, 169 insertions(+), 62 deletions(-) diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index c4ba59ced9..dc6d3a5111 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -34,7 +34,14 @@ fn main() -> Result<(), io::Error> { .short("l") .long("listen") .takes_value(true) - .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"), + .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5454)"), + ) + .arg( + Arg::with_name("pageserver") + .short("p") + .long("pageserver") + .takes_value(true) + .help("address ip:port of pageserver with which wal_acceptor should establish connection"), ) .arg( Arg::with_name("daemonize") @@ -56,6 +63,7 @@ fn main() -> Result<(), io::Error> { data_dir: PathBuf::from("./"), daemonize: false, no_sync: false, + pageserver_addr: None, listen_addr: "127.0.0.1:5454".parse().unwrap(), }; @@ -75,6 +83,10 @@ fn main() -> Result<(), io::Error> { conf.listen_addr = addr.parse().unwrap(); } + if let Some(addr) = arg_matches.value_of("pageserver") { + conf.pageserver_addr = Some(addr.parse().unwrap()); + } + start_wal_acceptor(conf) } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 28fe52ae01..7dbb0b17f3 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -13,4 +13,5 @@ pub struct WalAcceptorConf { pub daemonize: bool, pub no_sync: bool, pub listen_addr: SocketAddr, + pub pageserver_addr: Option, } diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 1e2f7902ba..99e7763576 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -1,8 +1,10 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::io; +use std::str; pub type Oid = u32; +pub type SystemId = u64; pub type Result = std::result::Result; #[derive(Debug)] @@ -35,6 +37,7 @@ pub enum BeMessage<'a> { pub struct FeStartupMessage { pub version: u32, pub kind: StartupRequestCode, + pub system_id: SystemId } #[derive(Debug)] @@ -76,10 +79,30 @@ impl FeStartupMessage { _ => StartupRequestCode::Normal, }; + let params_bytes = &buf[8..len]; + let params_str = str::from_utf8(¶ms_bytes).unwrap(); + let params = params_str.split('\0'); + let mut options = false; + let mut system_id : u64 = 0; + for p in params { + if p == "options" { + options = true; + } else if options { + for opt in p.split(' ') { + if opt.starts_with("system.id=") { + system_id = opt[10..].parse::().unwrap(); + break; + } + } + break; + } + } + buf.advance(len as usize); Ok(Some(FeMessage::StartupMessage(FeStartupMessage { version, kind, + system_id, }))) } } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 0f14cd2ae8..26fa0209cc 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -21,12 +21,14 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::mem; use std::str; -use std::sync::Mutex; +use std::sync::{Mutex,Arc}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime; use tokio::sync::Notify; use tokio::task; +use std::collections::HashMap; +use tokio_postgres::{connect, NoTls, Error}; use crate::pq_protocol::*; use crate::xlog_utils::*; @@ -62,7 +64,7 @@ struct ServerInfo { protocol_version: u32, /* proxy-safekeeper protocol version */ pg_version: u32, /* Postgres server version */ node_id: NodeId, - system_id: u64, /* Postgres system identifier */ + system_id: SystemId, /* Postgres system identifier */ wal_end: XLogRecPtr, timeline: TimeLineID, wal_seg_size: u32, @@ -130,7 +132,7 @@ struct SafeKeeperResponse { } /* - * State shared by safekeeper tasks and protected by mutex + * Shared state associated with database instance (tenant) */ #[derive(Debug)] struct SharedState { @@ -141,12 +143,13 @@ struct SharedState { } /* - * Static data + * Database instance (tenant) */ #[derive(Debug)] -pub struct WalAcceptor { - mutex: Mutex, /* mutext for protecting shared state */ - cond: Notify, /* conditional variable used to notify wal senders */ +pub struct System { + id : SystemId, + mutex : Mutex, + cond: Notify, /* conditional variable used to notify wal senders */ } /* @@ -154,7 +157,7 @@ pub struct WalAcceptor { */ #[derive(Debug)] struct Connection { - acceptor: &'static WalAcceptor, + system: Option>, stream: TcpStream, /* Postgres connection */ inbuf: BytesMut, /* input buffer */ outbuf: BytesMut, /* output buffer */ @@ -346,7 +349,7 @@ impl Serializer for SafeKeeperResponse { } lazy_static! { - pub static ref SELF: WalAcceptor = WalAcceptor::new(); + pub static ref SYSTEMS: Mutex>> = Mutex::new(HashMap::new()); } pub fn thread_main(conf: WalAcceptorConf) { @@ -362,15 +365,33 @@ pub fn thread_main(conf: WalAcceptorConf) { info!("Starting wal acceptor on {}", conf.listen_addr); - SELF.load_control_file(&conf); - runtime.block_on(async { - let _unused = SELF.main_loop(&conf).await; + let _unused = main_loop(&conf).await; }); } -impl WalAcceptor { - pub fn new() -> WalAcceptor { +async fn main_loop(conf: &WalAcceptorConf) -> Result<()> { + let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?; + loop { + match listener.accept().await { + Ok((socket, peer_addr)) => { + debug!("accepted connection from {}", peer_addr); + socket.set_nodelay(true)?; + let mut conn = Connection::new(socket, &conf); + task::spawn(async move { + if let Err(err) = conn.run().await { + error!("error: {}", err); + } + }); + } + Err(e) => error!("Failed to accept connection: {}", e), + } + } +} + + +impl System { + pub fn new(id : SystemId) -> System { let shared_state = SharedState { commit_lsn: 0, info: SafeKeeperInfo::new(), @@ -381,7 +402,8 @@ impl WalAcceptor { catalog_xmin: u64::MAX, }, }; - WalAcceptor { + System { + id: id, mutex: Mutex::new(shared_state), cond: Notify::new(), } @@ -424,7 +446,9 @@ impl WalAcceptor { // Load and lock control file (prevent running more than one instane of safekeeper */ fn load_control_file(&self, conf: &WalAcceptorConf) { - let control_file_path = conf.data_dir.join(CONTROL_FILE_NAME); + let control_file_path = conf.data_dir + .join(self.id.to_string()) + .join(CONTROL_FILE_NAME); match OpenOptions::new() .read(true) .write(true) @@ -492,35 +516,15 @@ impl WalAcceptor { } Ok(()) } - - async fn main_loop(&'static self, conf: &WalAcceptorConf) -> Result<()> { - let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?; - loop { - match listener.accept().await { - Ok((socket, peer_addr)) => { - debug!("accepted connection from {}", peer_addr); - socket.set_nodelay(true)?; - let mut conn = Connection::new(self, socket, &conf); - task::spawn(async move { - if let Err(err) = conn.run().await { - error!("error: {}", err); - } - }); - } - Err(e) => error!("Failed to accept connection: {}", e), - } - } - } } impl Connection { pub fn new( - acceptor: &'static WalAcceptor, socket: TcpStream, conf: &WalAcceptorConf, ) -> Connection { Connection { - acceptor: acceptor, + system: None, stream: socket, inbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024), @@ -529,6 +533,10 @@ impl Connection { } } + fn system(&self) -> Arc { + self.system.as_ref().unwrap().clone() + } + async fn run(&mut self) -> Result<()> { self.inbuf.resize(4, 0u8); self.stream.read_exact(&mut self.inbuf[0..4]).await?; @@ -548,15 +556,65 @@ impl Connection { Ok(T::unpack(&mut self.inbuf)) } + async fn request_callback(&self) -> std::result::Result<(), Error> { + if let Some(addr) = self.conf.pageserver_addr { + let ps_connstr = format!( + "host={} port={} dbname={} user={}", + addr.ip(), + addr.port(), + "no_db", + "no_user", + ); + let callme = format!( + "callmemaybe host={} port={} replication=1 options='-c system.id={}'", + self.conf.listen_addr.ip(), + self.conf.listen_addr.port(), + self.system().get_info().server.system_id, + ); + let (client, connection) = connect(&ps_connstr, NoTls).await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("pageserver connection error: {}", e); + } + }); + client.simple_query(&callme).await?; + } + Ok(()) + } + + fn set_system(&mut self, id: SystemId) -> Result<()> { + let mut systems = SYSTEMS.lock().unwrap(); + if id == 0 { // non-multitenant configuration: just sigle instance + if let Some(system) = systems.values().next() { + self.system = Some(system.clone()); + return Ok(()); + } + io_error!("No active instances"); + } + if !systems.contains_key(&id) { + let system_dir = self.conf.data_dir.join(id.to_string()); + fs::create_dir_all(system_dir)?; + systems.insert(id, Arc::new(System::new(id))); + } + self.system = Some(systems.get(&id).unwrap().clone()); + Ok(()) + } + // Receive WAL from wal_proposer async fn receive_wal(&mut self) -> Result<()> { - let mut my_info = self.acceptor.get_info(); // Receive information about server let server_info = self.read_req::().await?; info!( - "Start handshake with wal_proposer {}", - self.stream.peer_addr()? + "Start handshake with wal_proposer {} sysid {}", + self.stream.peer_addr()?, server_info.system_id ); + self.set_system(server_info.system_id)?; + self.system().load_control_file(&self.conf); + + let mut my_info = self.system().get_info(); /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { @@ -605,9 +663,9 @@ impl Connection { ); } my_info.server.node_id = prop.node_id; - self.acceptor.set_info(&my_info); + self.system().set_info(&my_info); /* Need to persist our vote first */ - self.acceptor.save_control_file(true)?; + self.system().save_control_file(true)?; let mut flushed_restart_lsn: XLogRecPtr = 0; let wal_seg_size = server_info.wal_seg_size as usize; @@ -617,6 +675,13 @@ impl Connection { prop.node_id.pack(&mut self.outbuf); self.send().await?; + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + if let Err(e) = self.request_callback().await { + // Do not treate it as fatal error and continue work + error!("Failed to send callme request to pageserver: {}", e); + } + info!( "Start streaming from server {} address {:?}", server_info.system_id, @@ -671,7 +736,7 @@ impl Connection { * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - self.acceptor.save_control_file(sync_control_file)?; + self.system().save_control_file(sync_control_file)?; if sync_control_file { flushed_restart_lsn = my_info.restart_lsn; @@ -682,7 +747,7 @@ impl Connection { let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: end_pos, - hs_feedback: self.acceptor.get_hs_feedback(), + hs_feedback: self.system().get_hs_feedback(), }; self.start_sending(); resp.pack(&mut self.outbuf); @@ -692,8 +757,7 @@ impl Connection { * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - self.acceptor - .notify_wal_senders(min(req.commit_lsn, end_pos)); + self.system().notify_wal_senders(min(req.commit_lsn, end_pos)); } Ok(()) } @@ -711,10 +775,7 @@ impl Connection { if self.inbuf.is_empty() { return Ok(None); } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "connection reset by peer", - )); + io_error!("connection reset by peer"); } } } @@ -767,6 +828,7 @@ impl Connection { BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); self.send().await?; self.init_done = true; + self.set_system(m.system_id)?; } StartupRequestCode::Cancel => return Ok(()), } @@ -784,7 +846,7 @@ impl Connection { break; } _ => { - return Err(io::Error::new(io::ErrorKind::Other, "unexpected message")); + io_error!("unexpected message"); } } } @@ -799,7 +861,7 @@ impl Connection { let (start_pos, timeline) = self.find_end_of_wal(false); let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32); let tli = timeline.to_string(); - let sysid = self.acceptor.get_info().server.system_id.to_string(); + let sysid = self.system().get_info().server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli_bytes = tli.as_bytes(); let sysid_bytes = sysid.as_bytes(); @@ -855,7 +917,7 @@ impl Connection { } else { 0 }; - let wal_seg_size = self.acceptor.get_info().server.wal_seg_size as usize; + let wal_seg_size = self.system().get_info().server.wal_seg_size as usize; if wal_seg_size == 0 { io_error!("Can not start replication before connecting to wal_proposer"); } @@ -896,9 +958,10 @@ impl Connection { /* normal mode */ loop { // Rust doesn't allow to grab async result from mutex scope - let notified = self.acceptor.cond.notified(); + let system = self.system(); + let notified = system.cond.notified(); { - let shared_state = self.acceptor.mutex.lock().unwrap(); + let shared_state = system.mutex.lock().unwrap(); commit_lsn = shared_state.commit_lsn; if start_pos < commit_lsn { end_pos = commit_lsn; @@ -916,7 +979,7 @@ impl Connection { Ok(0) => break, Ok(_) => match self.parse_message()? { Some(FeMessage::CopyData(m)) => self - .acceptor + .system() .add_hs_feedback(HotStandbyFeedback::parse(&m.body)), _ => {} }, @@ -934,11 +997,15 @@ impl Connection { } else { let segno = XLByteToSeg(start_pos, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial"); + let wal_file_path = self.conf.data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone() + ".partial"); if let Ok(opened_file) = File::open(&wal_file_path) { file = opened_file; } else { - let wal_file_path = self.conf.data_dir.join(wal_file_name); + let wal_file_path = self.conf.data_dir + .join(self.system().id.to_string()) + .join(wal_file_name); match File::open(&wal_file_path) { Ok(opened_file) => file = opened_file, Err(e) => { @@ -1017,8 +1084,12 @@ impl Connection { /* Open file */ let segno = XLByteToSeg(start_pos, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self.conf.data_dir.join(wal_file_name.clone()); - let wal_file_partial_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial"); + let wal_file_path = self.conf.data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self.conf.data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone() + ".partial"); { let mut wal_file: File; @@ -1080,7 +1151,7 @@ impl Connection { fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) { find_end_of_wal( &self.conf.data_dir, - self.acceptor.get_info().server.wal_seg_size as usize, + self.system().get_info().server.wal_seg_size as usize, precise, ) }