diff --git a/integration_tests/tests/control_plane/mod.rs b/integration_tests/tests/control_plane/mod.rs index 497efefe5a..eab3f345af 100644 --- a/integration_tests/tests/control_plane/mod.rs +++ b/integration_tests/tests/control_plane/mod.rs @@ -95,11 +95,11 @@ impl StorageControlPlane { cplane } - pub fn stop(&self) { + pub fn stop(&self) { for wa in self.wal_acceptors.iter() { - wa.stop(); - } - } + wa.stop(); + } + } // // postgres <-> wal_acceptor x3 <-> page_server // fn local(&mut self) -> StorageControlPlane { @@ -136,11 +136,10 @@ impl StorageControlPlane { impl Drop for StorageControlPlane { fn drop(&mut self) { - self.stop(); - } + self.stop(); + } } - pub struct PageServerNode { page_service_addr: SocketAddr, data_dir: PathBuf, diff --git a/vendor/postgres b/vendor/postgres index 4d92fa940f..b1f5a5ec14 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 4d92fa940f102e44f2ce8dc45ea1cc6da73a063a +Subproject commit b1f5a5ec145d5d9614eec4824074edae1162e5fa diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 99e7763576..286d563b73 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -37,7 +37,7 @@ pub enum BeMessage<'a> { pub struct FeStartupMessage { pub version: u32, pub kind: StartupRequestCode, - pub system_id: SystemId + pub system_id: SystemId, } #[derive(Debug)] @@ -79,30 +79,30 @@ impl FeStartupMessage { _ => StartupRequestCode::Normal, }; - let params_bytes = &buf[8..len]; - let params_str = str::from_utf8(¶ms_bytes).unwrap(); - let params = params_str.split('\0'); - let mut options = false; - let mut system_id : u64 = 0; - for p in params { - if p == "options" { - options = true; - } else if options { - for opt in p.split(' ') { - if opt.starts_with("system.id=") { - system_id = opt[10..].parse::().unwrap(); - break; - } - } - break; - } - } + let params_bytes = &buf[8..len]; + let params_str = str::from_utf8(¶ms_bytes).unwrap(); + let params = params_str.split('\0'); + let mut options = false; + let mut system_id: u64 = 0; + for p in params { + if p == "options" { + options = true; + } else if options { + for opt in p.split(' ') { + if opt.starts_with("system.id=") { + system_id = opt[10..].parse::().unwrap(); + break; + } + } + break; + } + } buf.advance(len as usize); Ok(Some(FeMessage::StartupMessage(FeStartupMessage { version, kind, - system_id, + system_id, }))) } } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index a4f0567623..b91a966afa 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -13,6 +13,7 @@ use log::*; use regex::Regex; use std::cmp::max; use std::cmp::min; +use std::collections::HashMap; use std::fs; use std::fs::File; use std::fs::OpenOptions; @@ -21,14 +22,13 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::mem; use std::str; -use std::sync::{Mutex,Arc}; +use std::sync::{Arc, Mutex}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime; use tokio::sync::Notify; use tokio::task; -use std::collections::HashMap; -use tokio_postgres::{connect, NoTls, Error}; +use tokio_postgres::{connect, Error, NoTls}; use crate::pq_protocol::*; use crate::xlog_utils::*; @@ -147,9 +147,9 @@ struct SharedState { */ #[derive(Debug)] pub struct System { - id : SystemId, - mutex : Mutex, - cond: Notify, /* conditional variable used to notify wal senders */ + id: SystemId, + mutex: Mutex, + cond: Notify, /* conditional variable used to notify wal senders */ } /* @@ -385,13 +385,12 @@ async fn main_loop(conf: &WalAcceptorConf) -> Result<()> { }); } Err(e) => error!("Failed to accept connection: {}", e), - } + } } } - impl System { - pub fn new(id : SystemId) -> System { + pub fn new(id: SystemId) -> System { let shared_state = SharedState { commit_lsn: 0, info: SafeKeeperInfo::new(), @@ -403,7 +402,7 @@ impl System { }, }; System { - id: id, + id: id, mutex: Mutex::new(shared_state), cond: Notify::new(), } @@ -446,9 +445,10 @@ impl System { // Load and lock control file (prevent running more than one instance of safekeeper fn load_control_file(&self, conf: &WalAcceptorConf) { - let control_file_path = conf.data_dir - .join(self.id.to_string()) - .join(CONTROL_FILE_NAME); + let control_file_path = conf + .data_dir + .join(self.id.to_string()) + .join(CONTROL_FILE_NAME); match OpenOptions::new() .read(true) .write(true) @@ -519,10 +519,7 @@ impl System { } impl Connection { - pub fn new( - socket: TcpStream, - conf: &WalAcceptorConf, - ) -> Connection { + pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection { Connection { system: None, stream: socket, @@ -533,9 +530,9 @@ impl Connection { } } - fn system(&self) -> Arc { - self.system.as_ref().unwrap().clone() - } + fn system(&self) -> Arc { + self.system.as_ref().unwrap().clone() + } async fn run(&mut self) -> Result<()> { self.inbuf.resize(4, 0u8); @@ -556,52 +553,53 @@ impl Connection { Ok(T::unpack(&mut self.inbuf)) } - async fn request_callback(&self) -> std::result::Result<(), Error> { - if let Some(addr) = self.conf.pageserver_addr { - let ps_connstr = format!( - "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), - "no_db", - "no_user", - ); - let callme = format!( - "callmemaybe host={} port={} replication=1 options='-c system.id={}'", - self.conf.listen_addr.ip(), - self.conf.listen_addr.port(), - self.system().get_info().server.system_id, - ); - let (client, connection) = connect(&ps_connstr, NoTls).await?; + async fn request_callback(&self) -> std::result::Result<(), Error> { + if let Some(addr) = self.conf.pageserver_addr { + let ps_connstr = format!( + "host={} port={} dbname={} user={}", + addr.ip(), + addr.port(), + "no_db", + "no_user", + ); + let callme = format!( + "callmemaybe host={} port={} replication=1 options='-c system.id={}'", + self.conf.listen_addr.ip(), + self.conf.listen_addr.port(), + self.system().get_info().server.system_id, + ); + let (client, connection) = connect(&ps_connstr, NoTls).await?; - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = connection.await { - error!("pageserver connection error: {}", e); - } - }); - client.simple_query(&callme).await?; - } - Ok(()) - } + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("pageserver connection error: {}", e); + } + }); + client.simple_query(&callme).await?; + } + Ok(()) + } - fn set_system(&mut self, id: SystemId) -> Result<()> { - let mut systems = SYSTEMS.lock().unwrap(); - if id == 0 { // non-multitenant configuration: just sigle instance - if let Some(system) = systems.values().next() { - self.system = Some(system.clone()); - return Ok(()); - } - io_error!("No active instances"); - } - if !systems.contains_key(&id) { - let system_dir = self.conf.data_dir.join(id.to_string()); - fs::create_dir_all(system_dir)?; - systems.insert(id, Arc::new(System::new(id))); - } - self.system = Some(systems.get(&id).unwrap().clone()); - Ok(()) - } + fn set_system(&mut self, id: SystemId) -> Result<()> { + let mut systems = SYSTEMS.lock().unwrap(); + if id == 0 { + // non-multitenant configuration: just sigle instance + if let Some(system) = systems.values().next() { + self.system = Some(system.clone()); + return Ok(()); + } + io_error!("No active instances"); + } + if !systems.contains_key(&id) { + let system_dir = self.conf.data_dir.join(id.to_string()); + fs::create_dir_all(system_dir)?; + systems.insert(id, Arc::new(System::new(id))); + } + self.system = Some(systems.get(&id).unwrap().clone()); + Ok(()) + } // Receive WAL from wal_proposer async fn receive_wal(&mut self) -> Result<()> { @@ -609,10 +607,11 @@ impl Connection { let server_info = self.read_req::().await?; info!( "Start handshake with wal_proposer {} sysid {}", - self.stream.peer_addr()?, server_info.system_id + self.stream.peer_addr()?, + server_info.system_id ); - self.set_system(server_info.system_id)?; - self.system().load_control_file(&self.conf); + self.set_system(server_info.system_id)?; + self.system().load_control_file(&self.conf); let mut my_info = self.system().get_info(); @@ -675,12 +674,12 @@ impl Connection { prop.node_id.pack(&mut self.outbuf); self.send().await?; - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - if let Err(e) = self.request_callback().await { - // Do not treate it as fatal error and continue work - error!("Failed to send callme request to pageserver: {}", e); - } + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + if let Err(e) = self.request_callback().await { + // Do not treate it as fatal error and continue work + error!("Failed to send callme request to pageserver: {}", e); + } info!( "Start streaming from server {} address {:?}", @@ -757,7 +756,8 @@ impl Connection { * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - self.system().notify_wal_senders(min(req.commit_lsn, end_pos)); + self.system() + .notify_wal_senders(min(req.commit_lsn, end_pos)); } Ok(()) } @@ -828,7 +828,7 @@ impl Connection { BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); self.send().await?; self.init_done = true; - self.set_system(m.system_id)?; + self.set_system(m.system_id)?; } StartupRequestCode::Cancel => return Ok(()), } @@ -958,7 +958,7 @@ impl Connection { /* normal mode */ loop { // Rust doesn't allow to grab async result from mutex scope - let system = self.system(); + let system = self.system(); let notified = system.cond.notified(); { let shared_state = system.mutex.lock().unwrap(); @@ -997,15 +997,19 @@ impl Connection { } else { let segno = XLByteToSeg(start_pos, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self.conf.data_dir - .join(self.system().id.to_string()) - .join(wal_file_name.clone() + ".partial"); + let wal_file_path = self + .conf + .data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone() + ".partial"); if let Ok(opened_file) = File::open(&wal_file_path) { file = opened_file; } else { - let wal_file_path = self.conf.data_dir - .join(self.system().id.to_string()) - .join(wal_file_name); + let wal_file_path = self + .conf + .data_dir + .join(self.system().id.to_string()) + .join(wal_file_name); match File::open(&wal_file_path) { Ok(opened_file) => file = opened_file, Err(e) => { @@ -1084,12 +1088,16 @@ impl Connection { /* Open file */ let segno = XLByteToSeg(start_pos, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self.conf.data_dir - .join(self.system().id.to_string()) - .join(wal_file_name.clone()); - let wal_file_partial_path = self.conf.data_dir - .join(self.system().id.to_string()) - .join(wal_file_name.clone() + ".partial"); + let wal_file_path = self + .conf + .data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .data_dir + .join(self.system().id.to_string()) + .join(wal_file_name.clone() + ".partial"); { let mut wal_file: File;