Rename WalAcceptor to Safekeeper in most places (#741)

This commit is contained in:
Egor Suvorov
2021-10-21 18:26:43 +03:00
committed by GitHub
parent c310932121
commit c058d04250
9 changed files with 33 additions and 33 deletions

View File

@@ -17,7 +17,7 @@ use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
use walkeeper::SafeKeeperConf;
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -54,7 +54,7 @@ fn main() -> Result<()> {
Arg::with_name("ttl")
.long("ttl")
.takes_value(true)
.help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"),
.help("interval for keeping WAL at safekeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::with_name("recall")
@@ -78,7 +78,7 @@ fn main() -> Result<()> {
)
.get_matches();
let mut conf = WalAcceptorConf {
let mut conf = SafeKeeperConf {
data_dir: PathBuf::from("./"),
daemonize: false,
no_sync: false,
@@ -125,10 +125,10 @@ fn main() -> Result<()> {
conf.recall_period = Some(humantime::parse_duration(recall)?);
}
start_wal_acceptor(conf)
start_safekeeper(conf)
}
fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let log_filename = conf.data_dir.join("safekeeper.log");
let log_file = logging::init(log_filename, conf.daemonize)?;

View File

@@ -10,7 +10,7 @@ use zenith_utils::lsn::Lsn;
use crate::safekeeper::AcceptorState;
use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines;
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
use zenith_utils::http::endpoint;
use zenith_utils::http::error::ApiError;
use zenith_utils::http::json::json_response;
@@ -22,9 +22,9 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
}
fn get_conf(request: &Request<Body>) -> &WalAcceptorConf {
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
request
.data::<Arc<WalAcceptorConf>>()
.data::<Arc<SafeKeeperConf>>()
.expect("unknown state type")
.as_ref()
}
@@ -80,7 +80,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
}
/// Safekeeper http router.
pub fn make_router(conf: WalAcceptorConf) -> RouterBuilder<hyper::Body, ApiError> {
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
router
.data(Arc::new(conf))

View File

@@ -23,7 +23,7 @@ pub mod defaults {
}
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub struct SafeKeeperConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,

View File

@@ -16,7 +16,7 @@ use crate::safekeeper::ProposerAcceptorMessage;
use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools;
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
use zenith_utils::connstring::connection_host_port;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage};
@@ -33,7 +33,7 @@ pub struct ReceiveWalConn<'pg> {
/// Periodically request pageserver to call back.
/// If pageserver already has replication channel, it will just ignore this request
///
fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
let ps_addr = conf.pageserver_addr.unwrap();
let ps_connstr = format!(
"postgresql://no_user:{}@{}/no_db",

View File

@@ -18,9 +18,9 @@ use tokio::runtime;
use tokio::time::sleep;
use walkdir::WalkDir;
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
pub fn thread_main(conf: WalAcceptorConf) {
pub fn thread_main(conf: SafeKeeperConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
@@ -42,7 +42,7 @@ async fn offload_files(
bucket: &Bucket,
listing: &HashSet<String>,
dir_path: &Path,
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
) -> Result<u64> {
let horizon = SystemTime::now() - conf.ttl.unwrap();
let mut n: u64 = 0;
@@ -70,7 +70,7 @@ async fn offload_files(
Ok(n)
}
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
async fn main_loop(conf: &SafeKeeperConf) -> Result<()> {
let region = Region::Custom {
region: env::var("S3_REGION").unwrap(),
endpoint: env::var("S3_ENDPOINT").unwrap(),

View File

@@ -1,4 +1,4 @@
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to
//! Part of Safekeeper pretending to be Postgres, streaming xlog to
//! pageserver/any other consumer.
//!
@@ -6,7 +6,7 @@ use crate::json_ctrl::handle_json_ctrl;
use crate::receive_wal::ReceiveWalConn;
use crate::replication::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use std::str::FromStr;
@@ -20,7 +20,7 @@ use crate::timeline::CreateControlFile;
/// Handler for streaming WAL from acceptor
pub struct SendWalHandler {
pub conf: WalAcceptorConf,
pub conf: SafeKeeperConf,
/// assigned application name
pub appname: Option<String>,
pub tenantid: Option<ZTenantId>,
@@ -85,7 +85,7 @@ impl postgres_backend::Handler for SendWalHandler {
}
impl SendWalHandler {
pub fn new(conf: WalAcceptorConf) -> Self {
pub fn new(conf: SafeKeeperConf) -> Self {
SendWalHandler {
conf,
appname: None,

View File

@@ -21,7 +21,7 @@ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
Storage, SK_FORMAT_VERSION, SK_MAGIC,
};
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
const CONTROL_FILE_NAME: &str = "safekeeper.control";
@@ -104,7 +104,7 @@ impl SharedState {
/// data dir.
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
@@ -135,7 +135,7 @@ impl SharedState {
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_control_file(
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(File, SafeKeeperState)> {
@@ -305,7 +305,7 @@ impl Timeline {
pub trait TimelineTools {
fn set(
&mut self,
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
@@ -317,7 +317,7 @@ pub trait TimelineTools {
impl TimelineTools for Option<Arc<Timeline>> {
fn set(
&mut self,
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
@@ -346,7 +346,7 @@ impl GlobalTimelines {
/// Get a timeline with control file loaded from the global TIMELINES map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &WalAcceptorConf,
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
@@ -375,7 +375,7 @@ impl GlobalTimelines {
#[derive(Debug)]
struct FileStorage {
control_file: File,
conf: WalAcceptorConf,
conf: SafeKeeperConf,
}
impl Storage for FileStorage {

View File

@@ -8,11 +8,11 @@ use std::net::{TcpListener, TcpStream};
use std::thread;
use crate::send_wal::SendWalHandler;
use crate::WalAcceptorConf;
use crate::SafeKeeperConf;
use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf, listener: TcpListener) -> Result<()> {
pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> {
loop {
match listener.accept() {
Ok((socket, peer_addr)) => {
@@ -31,7 +31,7 @@ pub fn thread_main(conf: WalAcceptorConf, listener: TcpListener) -> Result<()> {
/// This is run by `thread_main` above, inside a background thread.
///
fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> {
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> {
socket.set_nodelay(true)?;
let mut conn_handler = SendWalHandler::new(conf);