start pageserver and pg on top of it

This commit is contained in:
Stas Kelvich
2021-03-23 22:56:16 +03:00
parent 8a80d055b9
commit 667ec0db60
7 changed files with 141 additions and 43 deletions

View File

@@ -6,7 +6,7 @@ use log::*;
use std::fs::File;
use std::io;
use std::path::PathBuf;
use std::{net::IpAddr, thread};
use std::thread;
use clap::{App, Arg};
use daemonize::Daemonize;
@@ -36,6 +36,11 @@ fn main() -> Result<(), io::Error> {
.long("wal-producer")
.takes_value(true)
.help("connect to the WAL sender (postgres or wal_acceptor) on ip:port (default: 127.0.0.1:65432)"))
.arg(Arg::with_name("listen")
.short("l")
.long("listen")
.takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"))
.arg(Arg::with_name("interactive")
.short("i")
.long("interactive")
@@ -56,8 +61,8 @@ fn main() -> Result<(), io::Error> {
data_dir: PathBuf::from("./"),
daemonize: false,
interactive: false,
wal_producer_ip: "127.0.0.1".parse::<IpAddr>().unwrap(),
wal_producer_port: 65432,
wal_producer_addr: "127.0.0.1:65432".parse().unwrap(),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
skip_recovery: false,
};
@@ -85,9 +90,11 @@ fn main() -> Result<(), io::Error> {
}
if let Some(addr) = arg_matches.value_of("wal_producer") {
let parts: Vec<&str> = addr.split(':').collect();
conf.wal_producer_ip = parts[0].parse().unwrap();
conf.wal_producer_port = parts[1].parse().unwrap();
conf.wal_producer_addr = addr.parse().unwrap();
}
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.parse().unwrap();
}
start_pageserver(conf)
@@ -160,22 +167,24 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// Launch the WAL receiver thread. It will try to connect to the WAL safekeeper,
// and stream the WAL. If the connection is lost, it will reconnect on its own.
// We just fire and forget it here.
let conf1 = conf.clone();
let walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(|| {
// thread code
walreceiver::thread_main(conf);
walreceiver::thread_main(conf1);
})
.unwrap();
threads.push(walreceiver_thread);
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let conf2 = conf.clone();
let page_server_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(|| {
// thread code
page_service::thread_main();
page_service::thread_main(conf2);
})
.unwrap();
threads.push(page_server_thread);

View File

@@ -13,7 +13,7 @@ use std::process::Command;
use std::str;
use std::{
io::Write,
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use postgres::{Client, NoTls};
@@ -24,41 +24,96 @@ use postgres::{Client, NoTls};
//
pub struct StorageControlPlane {
wal_acceptors: Vec<WalAcceptorNode>,
last_wal_acceptor_port: u32,
pager_servers: Vec<PageServerNode>,
last_page_server_port: u32,
page_servers: Vec<PageServerNode>,
}
impl StorageControlPlane {
// postgres <-> page_server
// fn one_page_server(&mut self, pg_ip: IpAddr, pg_port: u32) -> StorageControlPlane {}
pub fn one_page_server(pg_addr: SocketAddr) -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
};
let workdir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/pageserver1");
fs::create_dir(workdir.clone()).unwrap();
let pserver = PageServerNode{
page_service_addr: "127.0.0.1:65200".parse().unwrap(),
wal_producer_addr: pg_addr,
data_dir: workdir
};
pserver.start();
cplane.page_servers.push(pserver);
cplane
}
// // postgres <-> wal_acceptor x3 <-> page_server
// fn local(&mut self) -> StorageControlPlane {
// }
fn get_page_server_conn_info() {}
pub fn page_server_addr(&self) -> &SocketAddr {
&self.page_servers[0].page_service_addr
}
fn get_wal_acceptor_conn_info() {}
}
pub struct PageServerNode {
page_service_ip: IpAddr,
page_service_port: u32,
wal_producer_ip: IpAddr,
wal_producer_port: u32,
page_service_addr: SocketAddr,
wal_producer_addr: SocketAddr,
data_dir: PathBuf,
}
impl PageServerNode {
// TODO: method to force redo on a specific relation
pub fn start() {}
fn binary_path(&self) -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR")).join("./target/debug/pageserver")
}
pub fn start(&self) {
let status = Command::new(self.binary_path())
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-w", self.wal_producer_addr.to_string().as_str()])
.args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.status()
.expect("failed to execute initdb");
if !status.success() {
panic!("pageserver start failed");
}
}
pub fn stop(&self) {
let pifile = self.data_dir.join("pageserver.pid");
let pid = fs::read_to_string(pifile).unwrap();
let status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
}
}
impl Drop for PageServerNode {
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
}
pub struct WalAcceptorNode {
port: u32,
ip: IpAddr,
listen: SocketAddr,
data_dir: PathBuf,
}
@@ -72,7 +127,7 @@ impl WalAcceptorNode {}
pub struct ComputeControlPlane {
pg_install_dir: PathBuf,
work_dir: PathBuf,
last_assigned_port: u32,
last_assigned_port: u16,
nodes: Vec<PostgresNode>,
}
@@ -92,7 +147,7 @@ impl ComputeControlPlane {
}
// TODO: check port availability and
fn get_port(&mut self) -> u32 {
fn get_port(&mut self) -> u16 {
let port = self.last_assigned_port + 1;
self.last_assigned_port += 1;
port
@@ -132,8 +187,7 @@ impl ComputeControlPlane {
// listen for selected port
node.append_conf(
"postgresql.conf",
format!(
"\
format!("\
max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
@@ -142,12 +196,7 @@ impl ComputeControlPlane {
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n\
",
address = node.ip,
port = node.port
)
.as_str(),
);
", address = node.ip, port = node.port).as_str());
node
}
@@ -157,7 +206,7 @@ impl ComputeControlPlane {
pub struct PostgresNode {
_node_id: usize,
port: u32,
port: u16,
ip: IpAddr,
pgdata: PathBuf,
pg_install_dir: PathBuf,
@@ -204,6 +253,10 @@ impl PostgresNode {
self.pg_ctl("stop", true);
}
pub fn addr(&self) -> SocketAddr {
SocketAddr::new(self.ip, self.port)
}
// XXX: cache that in control plane
fn whoami(&self) -> String {
let output = Command::new("whoami")
@@ -241,6 +294,6 @@ impl Drop for PostgresNode {
// TODO: put logs to a separate location to run `tail -F` on them
fn drop(&mut self) {
self.pg_ctl("stop", false);
fs::remove_dir_all(self.pgdata.clone()).unwrap();
// fs::remove_dir_all(self.pgdata.clone()).unwrap();
}
}

View File

@@ -1,4 +1,4 @@
use std::net::IpAddr;
use std::net::SocketAddr;
use std::path::PathBuf;
#[allow(dead_code)]
@@ -15,11 +15,12 @@ pub mod tui_event;
mod tui_logger;
#[allow(dead_code)]
#[derive(Clone)]
pub struct PageServerConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub interactive: bool,
pub wal_producer_ip: IpAddr,
pub wal_producer_port: u32,
pub wal_producer_addr: SocketAddr,
pub listen_addr: SocketAddr,
pub skip_recovery: bool,
}

View File

@@ -19,6 +19,7 @@ use std::io;
use log::*;
use crate::page_cache;
use crate::PageServerConf;
type Result<T> = std::result::Result<T, io::Error>;
@@ -205,7 +206,7 @@ impl FeMessage {
///////////////////////////////////////////////////////////////////////////////
pub fn thread_main() {
pub fn thread_main(conf: PageServerConf) {
// Create a new thread pool
//
@@ -214,11 +215,10 @@ pub fn thread_main() {
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
let listen_address = "127.0.0.1:5430";
info!("Starting page server on {}", listen_address);
info!("Starting page server on {}", conf.listen_addr);
runtime.block_on(async {
let _unused = page_service_main(listen_address).await;
let _unused = page_service_main(conf.listen_addr.to_string().as_str()).await;
});
}

View File

@@ -45,7 +45,7 @@ pub fn thread_main(conf: PageServerConf) {
async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
// Connect to the database in replication mode.
let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_ip, conf.wal_producer_port);
let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_addr.ip(), conf.wal_producer_addr.port());
debug!("connecting to {}...", conn_str);
let (mut rclient, connection) = connect_replication(
conn_str.as_str(),

View File

@@ -1,4 +1,5 @@
use pageserver::control_plane::ComputeControlPlane;
use pageserver::control_plane::StorageControlPlane;
#[test]
fn test_actions() {

View File

@@ -1,10 +1,44 @@
use std::thread::sleep;
use std::time::Duration;
use pageserver::control_plane::ComputeControlPlane;
use pageserver::control_plane::StorageControlPlane;
// XXX: force all redo at the end
// -- restart + seqscan won'r read deleted stuff
// -- restart + seqscan won't read deleted stuff
// -- pageserver api endpoint to check all rels
// Handcrafted cases with wal records that are (were) problematic for redo.
#[test]
fn test_redo_cases() {}
fn test_redo_cases() {
// Allocate postgres instance
let mut compute_cplane = ComputeControlPlane::local();
let node = compute_cplane.new_vanilla_node();
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server(node.addr());
let pageserver_addr = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pageserver_addr.ip(), pageserver_addr.port()).as_str());
println!("pew!");
sleep(Duration::from_secs(5));
node.start();
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
let count: i64 = node
.safe_psql("postgres", "SELECT count(*) FROM t")
.first()
.unwrap()
.get(0);
assert_eq!(count, 100000);
}
// Runs pg_regress on a compute node
#[test]