From c058d0425004e1754e8a89877f72cbf439358675 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Thu, 21 Oct 2021 18:26:43 +0300 Subject: [PATCH] Rename WalAcceptor to Safekeeper in most places (#741) --- control_plane/src/compute.rs | 6 +++--- walkeeper/src/bin/safekeeper.rs | 10 +++++----- walkeeper/src/http/routes.rs | 8 ++++---- walkeeper/src/lib.rs | 2 +- walkeeper/src/receive_wal.rs | 4 ++-- walkeeper/src/s3_offload.rs | 8 ++++---- walkeeper/src/send_wal.rs | 8 ++++---- walkeeper/src/timeline.rs | 14 +++++++------- walkeeper/src/wal_service.rs | 6 +++--- 9 files changed, 33 insertions(+), 33 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index fb98eeca03..a2e0b24524 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -210,7 +210,7 @@ impl PostgresNode { }) } - fn sync_walkeepers(&self) -> Result { + fn sync_safekeepers(&self) -> Result { let pg_path = self.env.pg_bin_dir().join("postgres"); let sync_handle = Command::new(pg_path) .arg("--sync-safekeepers") @@ -235,7 +235,7 @@ impl PostgresNode { } let lsn = Lsn::from_str(std::str::from_utf8(&sync_output.stdout)?.trim())?; - println!("Walkeepers synced on {}", lsn); + println!("Safekeepers synced on {}", lsn); Ok(lsn) } @@ -357,7 +357,7 @@ impl PostgresNode { // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again // when things would be more stable (TODO). - let lsn = self.sync_walkeepers()?; + let lsn = self.sync_safekeepers()?; if lsn == Lsn(0) { None } else { diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 7ce8765789..4c4e3eff5b 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -17,7 +17,7 @@ use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; -use walkeeper::WalAcceptorConf; +use walkeeper::SafeKeeperConf; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -54,7 +54,7 @@ fn main() -> Result<()> { Arg::with_name("ttl") .long("ttl") .takes_value(true) - .help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"), + .help("interval for keeping WAL at safekeeper node, after which them will be uploaded to S3 and removed locally"), ) .arg( Arg::with_name("recall") @@ -78,7 +78,7 @@ fn main() -> Result<()> { ) .get_matches(); - let mut conf = WalAcceptorConf { + let mut conf = SafeKeeperConf { data_dir: PathBuf::from("./"), daemonize: false, no_sync: false, @@ -125,10 +125,10 @@ fn main() -> Result<()> { conf.recall_period = Some(humantime::parse_duration(recall)?); } - start_wal_acceptor(conf) + start_safekeeper(conf) } -fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> { +fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let log_filename = conf.data_dir.join("safekeeper.log"); let log_file = logging::init(log_filename, conf.daemonize)?; diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index f5a69d3522..9b4f9df5b8 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -10,7 +10,7 @@ use zenith_utils::lsn::Lsn; use crate::safekeeper::AcceptorState; use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; use zenith_utils::http::endpoint; use zenith_utils::http::error::ApiError; use zenith_utils::http::json::json_response; @@ -22,9 +22,9 @@ async fn status_handler(_: Request) -> Result, ApiError> { Ok(json_response(StatusCode::OK, "")?) } -fn get_conf(request: &Request) -> &WalAcceptorConf { +fn get_conf(request: &Request) -> &SafeKeeperConf { request - .data::>() + .data::>() .expect("unknown state type") .as_ref() } @@ -80,7 +80,7 @@ async fn timeline_status_handler(request: Request) -> Result RouterBuilder { +pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let router = endpoint::make_router(); router .data(Arc::new(conf)) diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 4406823076..8e9b1de77a 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -23,7 +23,7 @@ pub mod defaults { } #[derive(Debug, Clone)] -pub struct WalAcceptorConf { +pub struct SafeKeeperConf { pub data_dir: PathBuf, pub daemonize: bool, pub no_sync: bool, diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 527c8d891c..71000b1764 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -16,7 +16,7 @@ use crate::safekeeper::ProposerAcceptorMessage; use crate::send_wal::SendWalHandler; use crate::timeline::TimelineTools; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; use zenith_utils::connstring::connection_host_port; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage}; @@ -33,7 +33,7 @@ pub struct ReceiveWalConn<'pg> { /// Periodically request pageserver to call back. /// If pageserver already has replication channel, it will just ignore this request /// -fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) { +fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) { let ps_addr = conf.pageserver_addr.unwrap(); let ps_connstr = format!( "postgresql://no_user:{}@{}/no_db", diff --git a/walkeeper/src/s3_offload.rs b/walkeeper/src/s3_offload.rs index a3743928f9..0c1630b9df 100644 --- a/walkeeper/src/s3_offload.rs +++ b/walkeeper/src/s3_offload.rs @@ -18,9 +18,9 @@ use tokio::runtime; use tokio::time::sleep; use walkdir::WalkDir; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; -pub fn thread_main(conf: WalAcceptorConf) { +pub fn thread_main(conf: SafeKeeperConf) { // Create a new thread pool // // FIXME: keep it single-threaded for now, make it easier to debug with gdb, @@ -42,7 +42,7 @@ async fn offload_files( bucket: &Bucket, listing: &HashSet, dir_path: &Path, - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, ) -> Result { let horizon = SystemTime::now() - conf.ttl.unwrap(); let mut n: u64 = 0; @@ -70,7 +70,7 @@ async fn offload_files( Ok(n) } -async fn main_loop(conf: &WalAcceptorConf) -> Result<()> { +async fn main_loop(conf: &SafeKeeperConf) -> Result<()> { let region = Region::Custom { region: env::var("S3_REGION").unwrap(), endpoint: env::var("S3_ENDPOINT").unwrap(), diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index fcd8595e15..565ef201b8 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -1,4 +1,4 @@ -//! Part of WAL acceptor pretending to be Postgres, streaming xlog to +//! Part of Safekeeper pretending to be Postgres, streaming xlog to //! pageserver/any other consumer. //! @@ -6,7 +6,7 @@ use crate::json_ctrl::handle_json_ctrl; use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use std::str::FromStr; @@ -20,7 +20,7 @@ use crate::timeline::CreateControlFile; /// Handler for streaming WAL from acceptor pub struct SendWalHandler { - pub conf: WalAcceptorConf, + pub conf: SafeKeeperConf, /// assigned application name pub appname: Option, pub tenantid: Option, @@ -85,7 +85,7 @@ impl postgres_backend::Handler for SendWalHandler { } impl SendWalHandler { - pub fn new(conf: WalAcceptorConf) -> Self { + pub fn new(conf: SafeKeeperConf) -> Self { SendWalHandler { conf, appname: None, diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b30c061c9c..f3795941e1 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -21,7 +21,7 @@ use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, }; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; const CONTROL_FILE_NAME: &str = "safekeeper.control"; @@ -104,7 +104,7 @@ impl SharedState { /// data dir. /// If create=false and file doesn't exist, bails out. fn create_restore( - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, timelineid: ZTimelineId, create: CreateControlFile, ) -> Result { @@ -135,7 +135,7 @@ impl SharedState { /// Fetch and lock control file (prevent running more than one instance of safekeeper) /// If create=false and file doesn't exist, bails out. fn load_control_file( - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, timelineid: ZTimelineId, create: CreateControlFile, ) -> Result<(File, SafeKeeperState)> { @@ -305,7 +305,7 @@ impl Timeline { pub trait TimelineTools { fn set( &mut self, - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, @@ -317,7 +317,7 @@ pub trait TimelineTools { impl TimelineTools for Option> { fn set( &mut self, - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, @@ -346,7 +346,7 @@ impl GlobalTimelines { /// Get a timeline with control file loaded from the global TIMELINES map. /// If control file doesn't exist and create=false, bails out. pub fn get( - conf: &WalAcceptorConf, + conf: &SafeKeeperConf, tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, @@ -375,7 +375,7 @@ impl GlobalTimelines { #[derive(Debug)] struct FileStorage { control_file: File, - conf: WalAcceptorConf, + conf: SafeKeeperConf, } impl Storage for FileStorage { diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 4a294e9c95..0cd97ab220 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -8,11 +8,11 @@ use std::net::{TcpListener, TcpStream}; use std::thread; use crate::send_wal::SendWalHandler; -use crate::WalAcceptorConf; +use crate::SafeKeeperConf; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main(conf: WalAcceptorConf, listener: TcpListener) -> Result<()> { +pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> { loop { match listener.accept() { Ok((socket, peer_addr)) => { @@ -31,7 +31,7 @@ pub fn thread_main(conf: WalAcceptorConf, listener: TcpListener) -> Result<()> { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> { socket.set_nodelay(true)?; let mut conn_handler = SendWalHandler::new(conf);