add CLI options to pageserver

This commit is contained in:
Stas Kelvich
2021-03-22 22:26:47 +03:00
parent a522961d0c
commit 9e89c1e2cd
6 changed files with 243 additions and 68 deletions

52
Cargo.lock generated
View File

@@ -15,6 +15,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "arrayref"
version = "0.3.6"
@@ -315,6 +324,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
@@ -1055,6 +1079,7 @@ version = "0.1.0"
dependencies = [
"byteorder",
"bytes",
"clap",
"crossbeam-channel",
"futures",
"lazy_static",
@@ -1660,6 +1685,12 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "subtle"
version = "2.4.0"
@@ -1700,6 +1731,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.24"
@@ -1898,6 +1938,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
@@ -1937,6 +1983,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34b2f665b594b07095e3ac3f718e13c2197143416fae4c5706cffb7b1af8d7f1"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"

View File

@@ -16,6 +16,7 @@ futures = "0.3.13"
lazy_static = "1.4.0"
log = "0.4.14"
stderrlog = "0.5.1"
clap = "2.33.0"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }

View File

@@ -2,58 +2,119 @@
// Main entry point for the Page Server executable
//
use std::thread;
use log::*;
use std::io::Error;
use std::{net::IpAddr, thread};
use clap::{App, Arg};
use pageserver::page_service;
use pageserver::restore_s3;
use pageserver::walreceiver;
use pageserver::walredo;
use std::io::Error;
use pageserver::PageServerConf;
fn main() -> Result<(), Error> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.arg(Arg::with_name("datadir")
.short("D")
.long("dir")
.takes_value(true)
.help("Path to the page server data directory"))
.arg(Arg::with_name("wal_producer")
.short("w")
.long("wal-producer")
.takes_value(true)
.help("connect to the WAL sender (postgres or wal_acceptor) on ip:port (default: 127.0.0.1:65432)"))
.arg(Arg::with_name("daemonize")
.short("d")
.long("daemonize")
.takes_value(false)
.help("Run in the background"))
.arg(Arg::with_name("skip_recovery")
.long("skip-recovery")
.takes_value(false)
.help("Skip S3 recovery procedy and start empty"))
.get_matches();
let mut conf = PageServerConf {
data_dir: String::from("."),
daemonize: false,
wal_producer_ip: "127.0.0.1".parse::<IpAddr>().unwrap(),
wal_producer_port: 65432,
skip_recovery: false,
};
if let Some(dir) = arg_matches.value_of("datadir") {
conf.data_dir = String::from(dir);
}
if arg_matches.is_present("daemonize") {
conf.daemonize = true;
}
if arg_matches.is_present("skip_recovery") {
conf.skip_recovery = true;
}
if let Some(addr) = arg_matches.value_of("wal_producer") {
let parts: Vec<&str> = addr.split(':').collect();
conf.wal_producer_ip = parts[0].parse().unwrap();
conf.wal_producer_port = parts[1].parse().unwrap();
}
start_pageserver(conf)
}
fn start_pageserver(conf: PageServerConf) -> Result<(), Error> {
let mut threads = Vec::new();
// Initialize logger
stderrlog::new()
.verbosity(3)
.module("pageserver")
.init().unwrap();
.init()
.unwrap();
info!("starting...");
// Initialize the WAL applicator
let walredo_thread = thread::Builder::new()
.name("WAL redo thread".into()).spawn(
|| {
walredo::wal_applicator_main();
}).unwrap();
.name("WAL redo thread".into())
.spawn(|| {
walredo::wal_applicator_main();
})
.unwrap();
threads.push(walredo_thread);
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
restore_s3::restore_main();
if !conf.skip_recovery {
restore_s3::restore_main();
}
// Launch the WAL receiver thread. It will try to connect to the WAL safekeeper,
// and stream the WAL. If the connection is lost, it will reconnect on its own.
// We just fire and forget it here.
let walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into()).spawn(
|| {
// thread code
walreceiver::thread_main();
}).unwrap();
.name("WAL receiver thread".into())
.spawn(|| {
// thread code
walreceiver::thread_main(conf);
})
.unwrap();
threads.push(walreceiver_thread);
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let page_server_thread = thread::Builder::new()
.name("Page Service thread".into()).spawn(
|| {
// thread code
page_service::thread_main();
}).unwrap();
.name("Page Service thread".into())
.spawn(|| {
// thread code
page_service::thread_main();
})
.unwrap();
threads.push(page_server_thread);
// never returns.

View File

@@ -7,58 +7,91 @@
// local installations.
//
use std::{io::Write, net::{IpAddr, Ipv4Addr}};
use std::process::{Command};
use std::path::{Path, PathBuf};
use std::fs::{self, OpenOptions};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str;
use std::{
io::Write,
net::{IpAddr, Ipv4Addr},
};
use postgres::{Client, NoTls};
// //
// // I'm intendedly modelling storage and compute control planes as a separate entities
// // as it is closer to the actual setup.
// //
// pub struct StorageControlPlane {
// keepers: Vec<SafeKeeperNode>,
// pagers: Vec<PageServerNode>
// }
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct StorageControlPlane {
wal_acceptors: Vec<WalAcceptorNode>,
last_wal_acceptor_port: u32,
pager_servers: Vec<PageServerNode>,
last_page_server_port: u32,
}
// impl StorageControlPlane {
// fn new_pageserver(&mut self) {
impl StorageControlPlane {
// postgres <-> page_server
// fn one_page_server(&mut self, pg_ip: IpAddr, pg_port: u32) -> StorageControlPlane {}
// }
// // postgres <-> wal_acceptor x3 <-> page_server
// fn local(&mut self) -> StorageControlPlane {
// }
// fn new_safekeeper(&mut self) {
fn get_page_server_conn_info() {}
// }
// }
fn get_wal_acceptor_conn_info() {}
}
pub struct PageServerNode {
page_service_ip: IpAddr,
page_service_port: u32,
wal_producer_ip: IpAddr,
wal_producer_port: u32,
data_dir: PathBuf,
}
impl PageServerNode {
// TODO: method to force redo on a specific relation
pub fn start() {}
}
pub struct WalAcceptorNode {
port: u32,
ip: IpAddr,
data_dir: PathBuf,
}
impl WalAcceptorNode {}
///////////////////////////////////////////////////////////////////////////////
//
// ComputeControlPlane
//
pub struct ComputeControlPlane {
pg_install_dir: PathBuf,
work_dir: PathBuf,
last_assigned_port: u32,
nodes: Vec<PostgresNode>
nodes: Vec<PostgresNode>,
}
impl ComputeControlPlane {
pub fn local() -> ComputeControlPlane {
// postgres configure and `make temp-install` are using this path
let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_install/");
let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/");
let work_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tmp_install/");
let work_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/");
ComputeControlPlane {
pg_install_dir: pg_install_dir,
work_dir: work_dir,
last_assigned_port: 65431,
nodes: Vec::new()
nodes: Vec::new(),
}
}
// TODO: check port availability and
// TODO: check port availability and
fn get_port(&mut self) -> u32 {
let port = self.last_assigned_port + 1;
self.last_assigned_port += 1;
@@ -72,8 +105,8 @@ impl ComputeControlPlane {
node_id: node_id,
port: self.get_port(),
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
pgdata: self.work_dir.join(format!("compute/pg{}",node_id)),
pg_install_dir: self.pg_install_dir.clone()
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_install_dir: self.pg_install_dir.clone(),
};
self.nodes.push(node);
let node = self.nodes.last().unwrap();
@@ -97,7 +130,10 @@ impl ComputeControlPlane {
// ", node.ip).as_str());
// listen for selected port
node.append_conf("postgresql.conf", format!("\
node.append_conf(
"postgresql.conf",
format!(
"\
max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
@@ -107,24 +143,27 @@ impl ComputeControlPlane {
listen_addresses = '{address}'\n\
port = {port}\n\
",
address = node.ip,
port = node.port
).as_str());
address = node.ip,
port = node.port
)
.as_str(),
);
node
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct PostgresNode {
node_id: usize,
port: u32,
ip: IpAddr,
pgdata: PathBuf,
pg_install_dir: PathBuf
pg_install_dir: PathBuf,
}
impl PostgresNode {
pub fn append_conf(&self, config: &str, opts: &str) {
OpenOptions::new()
.append(true)
@@ -138,9 +177,11 @@ impl PostgresNode {
let pg_ctl_path = self.pg_install_dir.join("bin/pg_ctl");
let pg_ctl = Command::new(pg_ctl_path)
.args(&[
"-D", self.pgdata.to_str().unwrap(),
"-l", self.pgdata.join("log").to_str().unwrap(),
action
"-D",
self.pgdata.to_str().unwrap(),
"-l",
self.pgdata.join("log").to_str().unwrap(),
action,
])
.env_clear()
.status()
@@ -178,8 +219,13 @@ impl PostgresNode {
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
// XXX: user!
let connstring = format!("host={} port={} dbname={} user={}",
self.ip, self.port, db, self.whoami());
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,
self.port,
db,
self.whoami()
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
client.query(sql, &[]).unwrap()
}
@@ -198,4 +244,3 @@ impl Drop for PostgresNode {
fs::remove_dir_all(self.pgdata.clone()).unwrap();
}
}

View File

@@ -1,7 +1,19 @@
use std::net::IpAddr;
#[allow(dead_code)]
pub mod control_plane;
pub mod page_cache;
pub mod page_service;
pub mod restore_s3;
pub mod walreceiver;
pub mod control_plane;
pub mod waldecoder;
pub mod page_cache;
pub mod walreceiver;
pub mod walredo;
pub struct PageServerConf {
pub data_dir: String,
pub daemonize: bool,
pub wal_producer_ip: IpAddr,
pub wal_producer_port: u32,
pub skip_recovery: bool,
}

View File

@@ -14,6 +14,7 @@ use tokio::time::{sleep, Duration};
use crate::waldecoder::WalStreamDecoder;
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::PageServerConf;
use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode};
use postgres_protocol::message::backend::ReplicationMessage;
@@ -21,7 +22,7 @@ use postgres_protocol::message::backend::ReplicationMessage;
//
// This is the entry point for the WAL receiver thread.
//
pub fn thread_main() {
pub fn thread_main(conf: PageServerConf) {
info!("Starting WAL receiver");
@@ -32,7 +33,7 @@ pub fn thread_main() {
runtime.block_on( async {
loop {
let _res = walreceiver_main().await;
let _res = walreceiver_main(&conf).await;
// TODO: print/log the error
info!("WAL streaming connection failed, retrying in 5 seconds...");
@@ -41,13 +42,16 @@ pub fn thread_main() {
});
}
async fn walreceiver_main() -> Result<(), Error> {
async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
// Connect to the database in replication mode.
debug!("connecting...");
let (mut rclient, connection) =
connect_replication("host=localhost user=zenith port=65432", NoTls, ReplicationMode::Physical).await?;
let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_ip, conf.wal_producer_port);
debug!("connecting to {}...", conn_str);
let (mut rclient, connection) = connect_replication(
conn_str.as_str(),
NoTls,
ReplicationMode::Physical
).await?;
debug!("connected!");
// The connection object performs the actual communication with the database,