Merge branch 'main' into rocksdb_pageserver

This commit is contained in:
Konstantin Knizhnik
2021-04-19 16:28:29 +03:00
37 changed files with 2097 additions and 943 deletions

View File

@@ -4,6 +4,7 @@ on: [push]
jobs:
regression-check:
timeout-minutes: 10
name: run regression test suite
runs-on: ubuntu-latest
@@ -76,10 +77,7 @@ jobs:
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
# That build is only to build dependencies and can be skipped if Cargo.lock
# wasn't changed. Next steps need their own build
- name: Install cargo deps
if: steps.cache_cargo.outputs.cache-hit != 'true'
- name: Build
run: |
cargo build

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
/target
/tmp_check
/tmp_install
/tmp_check_cli
.vscode

114
Cargo.lock generated
View File

@@ -420,6 +420,21 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "control_plane"
version = "0.1.0"
dependencies = [
"home",
"lazy_static",
"postgres",
"rand 0.8.3",
"regex",
"serde",
"serde_derive",
"tokio-postgres",
"toml",
]
[[package]]
name = "core-foundation"
version = "0.9.1"
@@ -843,6 +858,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654"
dependencies = [
"winapi",
]
[[package]]
name = "http"
version = "0.2.3"
@@ -894,7 +918,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project",
"socket2 0.4.0",
"socket2",
"tokio",
"tower-service",
"tracing",
@@ -948,12 +972,11 @@ dependencies = [
name = "integration_tests"
version = "0.1.0"
dependencies = [
"control_plane",
"lazy_static",
"pageserver",
"postgres",
"rand 0.8.3",
"tokio-postgres",
"walkeeper",
]
[[package]]
@@ -1146,7 +1169,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
dependencies = [
"libc",
"socket2 0.4.0",
"socket2",
]
[[package]]
@@ -1276,6 +1299,7 @@ dependencies = [
"log",
"postgres",
"postgres-protocol",
"postgres-types",
"rand 0.8.3",
"regex",
"rocksdb",
@@ -1286,10 +1310,12 @@ dependencies = [
"slog-stdlog",
"slog-term",
"termion",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-stream",
"tui",
"walkdir",
]
[[package]]
@@ -1406,8 +1432,8 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.19.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"bytes",
"fallible-iterator",
@@ -1419,8 +1445,8 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.6.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"base64",
"byteorder",
@@ -1436,8 +1462,8 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.2.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"bytes",
"fallible-iterator",
@@ -1701,7 +1727,7 @@ dependencies = [
[[package]]
name = "rust-s3"
version = "0.27.0-beta1"
source = "git+https://github.com/hlinnaka/rust-s3#7f15a24ec7daa0a5d9516da706212745f9042818"
source = "git+https://github.com/hlinnaka/rust-s3?rev=7f15a24ec7daa0a5d9516da706212745f9042818#7f15a24ec7daa0a5d9516da706212745f9042818"
dependencies = [
"async-std",
"async-trait",
@@ -1756,6 +1782,15 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.19"
@@ -1967,17 +2002,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.4.0"
@@ -2171,8 +2195,8 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.7.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"async-trait",
"byteorder",
@@ -2183,10 +2207,10 @@ dependencies = [
"parking_lot",
"percent-encoding",
"phf",
"pin-project",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2 0.3.19",
"socket2",
"tokio",
"tokio-util",
]
@@ -2216,6 +2240,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]]
name = "tower-service"
version = "0.3.1"
@@ -2354,6 +2387,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "walkeeper"
version = "0.1.0"
@@ -2370,7 +2414,6 @@ dependencies = [
"futures",
"lazy_static",
"log",
"pageserver",
"postgres",
"postgres-protocol",
"rand 0.8.3",
@@ -2519,6 +2562,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@@ -2539,3 +2591,11 @@ name = "xml-rs"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "zenith"
version = "0.1.0"
dependencies = [
"clap",
"control_plane",
]

View File

@@ -3,4 +3,6 @@ members = [
"integration_tests",
"pageserver",
"walkeeper",
"zenith",
"control_plane",
]

View File

@@ -2,6 +2,54 @@
Zenith substitutes PostgreSQL storage layer and redistributes data across a cluster of nodes
## Running local installation
1. Build zenith and patched postgres
```sh
git clone --recursive https://github.com/libzenith/zenith.git
cd zenith
./pgbuild.sh # builds postgres and installs it to ./tmp_install
cargo build
```
2. Start pageserver and postggres on top of it (should be called from repo root):
```sh
# Create ~/.zenith with proper paths to binaries and data
# Later that would be responsibility of a package install script
>./target/debug/zenith init
# start pageserver
> ./target/debug/zenith start
Starting pageserver at '127.0.0.1:64000'
# create and configure postgres data dir
> ./target/debug/zenith pg create
Creating new postgres: path=/Users/user/code/zenith/tmp_check_cli/compute/pg1 port=55432
Database initialized
# start it
> ./target/debug/zenith pg start pg1
# look up status and connection info
> ./target/debug/zenith pg list
NODE ADDRESS STATUS
pg1 127.0.0.1:55432 running
```
3. Now it is possible to connect to postgres and run some queries:
```
> psql -p55432 -h 127.0.0.1 postgres
postgres=# CREATE TABLE t(key int primary key, value text);
CREATE TABLE
postgres=# insert into t values(1,1);
INSERT 0 1
postgres=# select * from t;
key | value
-----+-------
1 | 1
(1 row)
```
## Running tests
```sh

1
control_plane/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
tmp_check/

19
control_plane/Cargo.toml Normal file
View File

@@ -0,0 +1,19 @@
[package]
name = "control_plane"
version = "0.1.0"
authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8.3"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
serde = ""
serde_derive = ""
toml = ""
home = "0.5.3"
lazy_static = ""
regex = "1"

View File

@@ -0,0 +1,431 @@
use std::error;
use std::fs::File;
use std::fs::{self, OpenOptions};
use std::net::TcpStream;
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf};
use std::{io::Write, net::SocketAddr};
use lazy_static::lazy_static;
use postgres::{Client, NoTls};
use regex::Regex;
use crate::local_env::{self, LocalEnv};
use crate::storage::{PageServerNode, WalProposerNode};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
//
// ComputeControlPlane
//
pub struct ComputeControlPlane {
base_port: u16,
pageserver: Arc<PageServerNode>,
pub nodes: BTreeMap<String, Arc<PostgresNode>>,
env: LocalEnv,
}
impl ComputeControlPlane {
// Load current nodes with ports from data directories on disk
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
// TODO: since pageserver do not have config file yet we believe here that
// it is running on default port. Change that when pageserver will have config.
let pageserver = Arc::new(PageServerNode::from_env(&env));
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(env.compute_dir())
.map_err(|e| {
format!(
"failed to list {}: {}",
env.compute_dir().to_str().unwrap(),
e
)
})?
.into_iter()
.map(|f| {
PostgresNode::from_dir_entry(f?, &env, &pageserver)
.map(|node| (node.name.clone(), Arc::new(node)))
})
.collect();
let nodes = nodes?;
Ok(ComputeControlPlane {
base_port: 55431,
pageserver,
nodes,
env,
})
}
fn get_port(&mut self) -> u16 {
1 + self
.nodes
.iter()
.map(|(_name, node)| node.address.port())
.max()
.unwrap_or(self.base_port)
}
pub fn local(pageserver: &Arc<PageServerNode>) -> ComputeControlPlane {
let env = local_env::test_env();
ComputeControlPlane {
base_port: 65431,
pageserver: Arc::clone(pageserver),
nodes: BTreeMap::new(),
env,
}
}
fn new_vanilla_node(&mut self, is_test: bool) -> Result<Arc<PostgresNode>> {
// allocate new node entry with generated port
let node_id = self.nodes.len() as u32 + 1;
let node = Arc::new(PostgresNode {
name: format!("pg{}", node_id),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
is_test,
});
node.init_vanilla()?;
self.nodes.insert(node.name.clone(), Arc::clone(&node));
Ok(node)
}
pub fn new_test_node(&mut self) -> Arc<PostgresNode> {
let addr = self.pageserver.address().clone();
let node = self.new_vanilla_node(true).unwrap();
// Configure that node to take pages from pageserver
node.append_conf(
"postgresql.conf",
format!(
"page_server_connstring = 'host={} port={}'\n",
addr.ip(),
addr.port()
)
.as_str(),
);
node
}
pub fn new_test_master_node(&mut self) -> Arc<PostgresNode> {
let node = self.new_vanilla_node(true).unwrap();
println!("Create vanilla node at {:?}", node.address);
node.append_conf(
"postgresql.conf",
"synchronous_standby_names = 'safekeeper_proxy'\n",
);
node
}
pub fn new_node(&mut self) -> Result<Arc<PostgresNode>> {
let addr = self.pageserver.address().clone();
let node = self.new_vanilla_node(false)?;
// Configure that node to take pages from pageserver
node.append_conf(
"postgresql.conf",
format!(
"page_server_connstring = 'host={} port={}'\n",
addr.ip(),
addr.port()
)
.as_str(),
);
Ok(node)
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct PostgresNode {
pub address: SocketAddr,
name: String,
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
is_test: bool,
}
impl PostgresNode {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<PostgresNode> {
if !entry.file_type()?.is_dir() {
let err_msg = format!(
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
entry.path().to_str().unwrap()
);
return Err(err_msg.into());
}
lazy_static! {
static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap();
}
// parse data directory name
let fname = entry.file_name();
let name = fname.to_str().unwrap().to_string();
// find out tcp port in config file
let cfg_path = entry.path().join("postgresql.conf");
let config = fs::read_to_string(cfg_path.clone()).map_err(|e| {
format!(
"failed to read config file in {}: {}",
cfg_path.to_str().unwrap(),
e
)
})?;
let err_msg = format!(
"failed to find port definition in config file {}",
cfg_path.to_str().unwrap()
);
let port: u16 = CONF_PORT_RE
.captures(config.as_str())
.ok_or(err_msg.clone() + " 1")?
.iter()
.last()
.ok_or(err_msg.clone() + " 3")?
.ok_or(err_msg.clone() + " 3")?
.as_str()
.parse()
.map_err(|e| format!("{}: {}", err_msg, e))?;
// ok now
Ok(PostgresNode {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
pageserver: Arc::clone(pageserver),
is_test: false,
})
}
fn init_vanilla(&self) -> Result<()> {
println!(
"Creating new postgres: path={} port={}",
self.pgdata().to_str().unwrap(),
self.address.port()
);
// initialize data directory
if self.is_test {
fs::remove_dir_all(self.pgdata().to_str().unwrap()).ok();
}
fs::create_dir_all(self.pgdata().to_str().unwrap())?;
let initdb_path = self.env.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
.args(&["-D", self.pgdata().to_str().unwrap()])
.arg("-N")
.arg("-A trust")
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.stdout(Stdio::null())
.status()?;
if !initdb.success() {
return Err("initdb failed".into());
}
// listen for selected port
self.append_conf(
"postgresql.conf",
format!(
"max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
address = self.address.ip(),
port = self.address.port()
)
.as_str(),
);
println!("Database initialized");
Ok(())
}
pub fn pgdata(&self) -> PathBuf {
self.env.compute_dir().join(self.name.clone())
}
pub fn status(&self) -> &str {
let timeout = Duration::from_millis(300);
let has_pidfile = self.pgdata().join("postmaster.pid").exists();
let can_connect = TcpStream::connect_timeout(&self.address, timeout).is_ok();
match (has_pidfile, can_connect) {
(true, true) => "running",
(false, false) => "stopped",
(true, false) => "crashed",
(false, true) => "running, no pidfile",
}
}
pub fn append_conf(&self, config: &str, opts: &str) {
OpenOptions::new()
.append(true)
.open(self.pgdata().join(config).to_str().unwrap())
.unwrap()
.write_all(opts.as_bytes())
.unwrap();
}
fn pg_ctl(&self, args: &[&str]) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let pg_ctl = Command::new(pg_ctl_path)
.args(
[
&[
"-D",
self.pgdata().to_str().unwrap(),
"-l",
self.pgdata().join("log").to_str().unwrap(),
],
args,
]
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()?;
if !pg_ctl.success() {
Err("pg_ctl failed".into())
} else {
Ok(())
}
}
pub fn start(&self) -> Result<()> {
let _res = self
.pageserver
.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"])
}
pub fn restart(&self) -> Result<()> {
self.pg_ctl(&["restart"])
}
pub fn stop(&self) -> Result<()> {
self.pg_ctl(&["-m", "immediate", "stop"])
}
pub fn connstr(&self) -> String {
format!(
"host={} port={} user={}",
self.address.ip(),
self.address.port(),
self.whoami()
)
}
// XXX: cache that in control plane
pub fn whoami(&self) -> String {
let output = Command::new("whoami")
.output()
.expect("failed to execute whoami");
if !output.status.success() {
panic!("whoami failed");
}
String::from_utf8(output.stdout).unwrap().trim().to_string()
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
client.query(sql, &[]).unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
Client::connect(connstring.as_str(), NoTls).unwrap()
}
/* Create stub controlfile and respective xlog to start computenode */
pub fn setup_controlfile(&self) {
let filepath = format!("{}/global/pg_control", self.pgdata().to_str().unwrap());
{
File::create(filepath).unwrap();
}
let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal");
let pg_resetwal = Command::new(pg_resetwal_path)
.args(&["-D", self.pgdata().to_str().unwrap()])
.arg("-f")
// TODO probably we will have to modify pg_resetwal
// .arg("--compute-node")
.status()
.expect("failed to execute pg_resetwal");
if !pg_resetwal.success() {
panic!("pg_resetwal failed");
}
}
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["-s", &wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(OpenOptions::new()
.append(true)
.open(self.env.data_dir.join("safepkeeper_proxy.log")).unwrap())
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
// TODO
pub fn pg_bench() {}
}
impl Drop for PostgresNode {
// destructor to clean up state after test is done
// XXX: we may detect failed test by setting some flag in catch_unwind()
// and checking it here. But let just clean datadirs on start.
fn drop(&mut self) {
if self.is_test {
let _ = self.stop();
}
}
}

12
control_plane/src/lib.rs Normal file
View File

@@ -0,0 +1,12 @@
//
// Local control plane.
//
// Can start, cofigure and stop postgres instances running as a local processes.
//
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
pub mod compute;
pub mod local_env;
pub mod storage;

View File

@@ -0,0 +1,187 @@
//
// This module is responsible for locating and loading paths in a local setup.
//
// Now it also provides init method which acts like a stub for proper installation
// script which will use local paths.
//
use std::env;
use std::error;
use std::fs;
use std::path::{Path, PathBuf};
use serde_derive::{Deserialize, Serialize};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
//
// This data structure represents deserialized zenith config, which should be
// located in ~/.zenith
//
// TODO: should we also support ZENITH_CONF env var?
//
#[derive(Serialize, Deserialize, Clone)]
pub struct LocalEnv {
// Here page server and compute nodes will create and store their data.
pub data_dir: PathBuf,
// Path to postgres distribution. It expected that "bin", "include",
// "lib", "share" from postgres distribution will be there. If at some point
// in time we will be able to run against vanilla postgres we may split that
// to four separate paths and match OS-specific installation layout.
pub pg_distrib_dir: PathBuf,
// Path to pageserver binary.
pub zenith_distrib_dir: PathBuf,
}
impl LocalEnv {
// postgres installation
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
}
// pageserver
pub fn pageserver_data_dir(&self) -> PathBuf {
self.data_dir.join("pageserver")
}
pub fn pageserver_log(&self) -> PathBuf {
self.pageserver_data_dir().join("pageserver.log")
}
pub fn pageserver_pidfile(&self) -> PathBuf {
self.pageserver_data_dir().join("pageserver.pid")
}
// compute nodes
pub fn compute_dir(&self) -> PathBuf {
self.data_dir.join("compute")
}
}
//
// Issues in rust-lang repo has several discussions about proper library to check
// home directory in a cross-platform way. Seems that current consensus is around
// home crate and cargo uses it.
//
fn get_home() -> Result<PathBuf> {
home::home_dir().ok_or("can not determine home directory path".into())
}
pub fn init() -> Result<()> {
let home_dir = get_home()?;
// check if config already exists
let cfg_path = home_dir.join(".zenith");
if cfg_path.exists() {
let err_msg = format!(
"{} already exists. Perhaps already initialized?",
cfg_path.to_str().unwrap()
);
return Err(err_msg.into());
}
// Now we can run init only from crate directory, so check that current dir is our crate.
// Use 'pageserver/Cargo.toml' existence as evidendce.
let cargo_path = env::current_dir()?;
if !cargo_path.join("pageserver/Cargo.toml").exists() {
let err_msg = "Current dirrectory does not look like a zenith repo. \
Please, run 'init' from zenith repo root.";
return Err(err_msg.into());
}
// ok, now check that expected binaries are present
// check postgres
let pg_distrib_dir = cargo_path.join("tmp_install");
let pg_path = pg_distrib_dir.join("bin/postgres");
if !pg_path.exists() {
let err_msg = format!(
"Can't find postres binary at {}. \
Perhaps './pgbuild.sh' is needed to build it first.",
pg_path.to_str().unwrap()
);
return Err(err_msg.into());
}
// check pageserver
let zenith_distrib_dir = cargo_path.join("target/debug/");
let pageserver_path = zenith_distrib_dir.join("pageserver");
if !pageserver_path.exists() {
let err_msg = format!(
"Can't find pageserver binary at {}. Please build it.",
pageserver_path.to_str().unwrap()
);
return Err(err_msg.into());
}
// ok, we are good to go
// create dirs
let data_dir = cargo_path.join("tmp_check_cli");
for &dir in &["compute", "pageserver"] {
fs::create_dir_all(data_dir.join(dir)).map_err(|e| {
format!(
"Failed to create directory in '{}': {}",
data_dir.to_str().unwrap(),
e
)
})?;
}
// write config
let conf = LocalEnv {
data_dir,
pg_distrib_dir,
zenith_distrib_dir,
};
let toml = toml::to_string(&conf)?;
fs::write(cfg_path, toml)?;
Ok(())
}
// check that config file is present
pub fn load_config() -> Result<LocalEnv> {
// home
let home_dir = get_home()?;
// check file exists
let cfg_path = home_dir.join(".zenith");
if !cfg_path.exists() {
let err_msg = format!(
"Zenith config is not found in {}. You need to run 'zenith init' first",
cfg_path.to_str().unwrap()
);
return Err(err_msg.into());
}
// load and parse file
let config = fs::read_to_string(cfg_path)?;
toml::from_str(config.as_str()).map_err(|e| e.into())
}
// local env for tests
pub fn test_env() -> LocalEnv {
let data_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_check");
fs::create_dir_all(data_dir.clone()).unwrap();
LocalEnv {
data_dir,
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: cargo_bin_dir(),
}
}
// Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().unwrap();
pathbuf.pop();
if pathbuf.ends_with("deps") {
pathbuf.pop();
}
return pathbuf;
}

View File

@@ -0,0 +1,398 @@
use std::error;
use std::fs;
use std::io;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use postgres::{Client, NoTls};
use crate::compute::PostgresNode;
use crate::local_env::{self, LocalEnv};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
}
impl TestStorageControlPlane {
// postgres <-> page_server
pub fn one_page_server(pgdata_base_path: String) -> TestStorageControlPlane {
let env = local_env::test_env();
let pserver = Arc::new(PageServerNode {
env: env.clone(),
kill_on_exit: true,
listen_address: None,
});
pserver.init();
if pgdata_base_path.is_empty() {
pserver.start().unwrap();
} else {
pserver.start_fromdatadir(pgdata_base_path).unwrap();
}
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
}
}
pub fn one_page_server_no_start() -> TestStorageControlPlane {
let env = local_env::test_env();
let pserver = Arc::new(PageServerNode {
env,
kill_on_exit: true,
listen_address: None,
});
pserver.init();
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
}
}
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(redundancy: usize) -> TestStorageControlPlane {
let env = local_env::test_env();
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: env.clone(),
kill_on_exit: true,
listen_address: None,
}),
test_done: AtomicBool::new(false),
};
cplane.pageserver.init();
cplane.pageserver.start().unwrap();
const WAL_ACCEPTOR_PORT: usize = 54321;
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: env.data_dir.join(format!("wal_acceptor_{}", i)),
env: env.clone(),
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
//
// Control routines for pageserver.
//
// Used in CLI and tests.
//
pub struct PageServerNode {
kill_on_exit: bool,
listen_address: Option<SocketAddr>,
pub env: LocalEnv,
}
impl PageServerNode {
pub fn from_env(env: &LocalEnv) -> PageServerNode {
PageServerNode {
kill_on_exit: false,
listen_address: None, // default
env: env.clone(),
}
}
pub fn address(&self) -> SocketAddr {
match self.listen_address {
Some(addr) => addr,
None => "127.0.0.1:64000".parse().unwrap(),
}
}
pub fn init(&self) {
fs::create_dir_all(self.env.pageserver_data_dir()).unwrap();
}
pub fn start(&self) -> Result<()> {
println!("Starting pageserver at '{}'", self.address());
let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method
.args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()])
.args(&["-l", self.address().to_string().as_str()])
.arg("-d")
.env_clear()
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()?;
if !status.success() {
return Err(Box::<dyn error::Error>::from(format!(
"Pageserver failed to start. See '{}' for details.",
self.env.pageserver_log().to_str().unwrap()
)));
} else {
return Ok(());
}
}
pub fn start_fromdatadir(&self, pgdata_base_path: String) -> Result<()> {
println!("Starting pageserver at '{}'", self.address());
let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method
.args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()])
.args(&["-l", self.address().to_string().as_str()])
.arg("-d")
.args(&["--restore-from", "local"])
.env_clear()
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGDATA_BASE_PATH", pgdata_base_path)
.status()?;
if !status.success() {
return Err(Box::<dyn error::Error>::from(format!(
"Pageserver failed to start. See '{}' for details.",
self.env.pageserver_log().to_str().unwrap()
)));
} else {
return Ok(());
}
}
pub fn stop(&self) -> Result<()> {
let pidfile = self.env.pageserver_pidfile();
let pid = read_pidfile(&pidfile)?;
let status = Command::new("kill")
.arg(&pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
return Err(Box::<dyn error::Error>::from(format!(
"Failed to kill pageserver with pid {}",
pid
)));
}
// await for pageserver stop
for _ in 0..5 {
let stream = TcpStream::connect(self.address());
if let Err(_e) = stream {
return Ok(());
}
println!("Stopping pageserver on {}", self.address());
thread::sleep(Duration::from_secs(1));
}
// ok, we failed to stop pageserver, let's panic
if !status.success() {
return Err(Box::<dyn error::Error>::from(format!(
"Failed to stop pageserver with pid {}",
pid
)));
} else {
return Ok(());
}
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address().ip(),
self.address().port(),
"no_db",
"no_user",
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap()
}
}
impl Drop for PageServerNode {
fn drop(&mut self) {
if self.kill_on_exit {
let _ = self.stop();
}
}
}
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
env: LocalEnv,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) -> std::result::Result<(), io::Error> {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
let pid = read_pidfile(&pidfile)?;
// Ignores any failures when running this command
let _status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
Ok(())
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
self.stop().unwrap();
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pub pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
let status = Command::new("kill")
.arg(self.pid.to_string())
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
pub fn regress_check(pg: &PostgresNode) {
pg.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("PGHOST", pg.address.ip().to_string())
.env("PGPORT", pg.address.port().to_string())
.env("PGUSER", pg.whoami())
.status()
.expect("pg_regress failed");
}
/// Read a PID file
///
/// This should contain an unsigned integer, but we return it as a String
/// because our callers only want to pass it back into a subcommand.
fn read_pidfile(pidfile: &Path) -> std::result::Result<String, io::Error> {
fs::read_to_string(pidfile).map_err(|err| {
eprintln!("failed to read pidfile {:?}: {:?}", pidfile, err);
err
})
}

1
integration_tests/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
tmp_check/

View File

@@ -9,8 +9,7 @@ edition = "2018"
[dependencies]
lazy_static = "1.4.0"
rand = "0.8.3"
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }
control_plane = { path = "../control_plane" }

View File

@@ -1,692 +0,0 @@
//
// Local control plane.
//
// Can start, cofigure and stop postgres instances running as a local processes.
//
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
use std::fs::File;
use std::fs::{self, OpenOptions};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str;
use std::sync::Arc;
use std::{
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use lazy_static::lazy_static;
use postgres::{Client, NoTls};
lazy_static! {
// postgres would be there if it was build by 'make postgres' here in the repo
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_install/bin");
pub static ref PG_LIB_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_install/lib");
pub static ref BIN_DIR : PathBuf = cargo_bin_dir();
pub static ref TEST_WORKDIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tmp_check");
}
// Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().ok().unwrap();
pathbuf.pop();
if pathbuf.ends_with("deps") {
pathbuf.pop();
}
return pathbuf;
}
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct StorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub page_servers: Vec<PageServerNode>,
}
impl StorageControlPlane {
// postgres <-> page_server
pub fn one_page_server() -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
};
let pserver = PageServerNode {
page_service_addr: "127.0.0.1:65200".parse().unwrap(),
data_dir: TEST_WORKDIR.join("pageserver"),
};
pserver.init();
pserver.start();
cplane.page_servers.push(pserver);
cplane
}
pub fn fault_tolerant(redundancy: usize) -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
};
const WAL_ACCEPTOR_PORT: usize = 54321;
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: TEST_WORKDIR.join(format!("wal_acceptor_{}", i)),
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
wa.stop();
}
}
// // postgres <-> wal_acceptor x3 <-> page_server
// fn local(&mut self) -> StorageControlPlane {
// }
pub fn page_server_addr(&self) -> &SocketAddr {
&self.page_servers[0].page_service_addr
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string().to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let addr = &self.page_servers[0].page_service_addr;
let connstring = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap()
}
}
impl Drop for StorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
pub struct PageServerNode {
page_service_addr: SocketAddr,
data_dir: PathBuf,
}
impl PageServerNode {
// TODO: method to force redo on a specific relation
// TODO: make wal-redo-postgres workable without data directory?
pub fn init(&self) {
fs::create_dir_all(self.data_dir.clone()).unwrap();
let datadir_path = self.data_dir.join("wal_redo_pgdata");
fs::remove_dir_all(datadir_path.to_str().unwrap()).ok();
let initdb = Command::new(PG_BIN_DIR.join("initdb"))
.args(&["-D", datadir_path.to_str().unwrap()])
.arg("-N")
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
}
pub fn start(&self) {
println!("Starting pageserver at '{}'", self.page_service_addr);
let status = Command::new(BIN_DIR.join("pageserver"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to start pageserver");
if !status.success() {
panic!("pageserver start failed");
}
}
pub fn stop(&self) {
let pidfile = self.data_dir.join("pageserver.pid");
let pid = fs::read_to_string(pidfile).unwrap();
let status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
}
}
impl Drop for PageServerNode {
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
}
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let status = Command::new(BIN_DIR.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) {
let pidfile = self.data_dir.join("wal_acceptor.pid");
if let Ok(pid) = fs::read_to_string(pidfile) {
let _status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
}
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
}
///////////////////////////////////////////////////////////////////////////////
//
// ComputeControlPlane
//
pub struct ComputeControlPlane<'a> {
pg_bin_dir: PathBuf,
work_dir: PathBuf,
last_assigned_port: u16,
storage_cplane: &'a StorageControlPlane,
nodes: Vec<Arc<PostgresNode>>,
}
impl ComputeControlPlane<'_> {
pub fn local(storage_cplane: &StorageControlPlane) -> ComputeControlPlane {
ComputeControlPlane {
pg_bin_dir: PG_BIN_DIR.to_path_buf(),
work_dir: TEST_WORKDIR.to_path_buf(),
last_assigned_port: 65431,
storage_cplane: storage_cplane,
nodes: Vec::new(),
}
}
// TODO: check port availability and
fn get_port(&mut self) -> u16 {
let port = self.last_assigned_port + 1;
self.last_assigned_port += 1;
port
}
pub fn new_vanilla_node<'a>(&mut self) -> &Arc<PostgresNode> {
// allocate new node entry with generated port
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
_node_id: node_id,
port: self.get_port(),
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
};
self.nodes.push(Arc::new(node));
let node = self.nodes.last().unwrap();
// initialize data directory
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
let initdb_path = self.pg_bin_dir.join("initdb");
println!("initdb_path: {}", initdb_path.to_str().unwrap());
let initdb = Command::new(initdb_path)
.args(&["-D", node.pgdata.to_str().unwrap()])
.arg("-N")
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
// // allow local replication connections
// node.append_conf("pg_hba.conf", format!("\
// host replication all {}/32 sspi include_realm=1 map=regress\n\
// ", node.ip).as_str());
// listen for selected port
node.append_conf(
"postgresql.conf",
format!(
"\
max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_level = replica\n\
wal_sender_timeout = 0\n\
listen_addresses = '{address}'\n\
port = {port}\n\
",
address = node.ip,
port = node.port
)
.as_str(),
);
node
}
// Init compute node without files, only datadir structure
// use initdb --compute-node flag and GUC 'computenode_mode'
// to distinguish the node
pub fn new_minimal_node(&mut self) -> &PostgresNode {
// allocate new node entry with generated port
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
_node_id: node_id,
port: self.get_port(),
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
};
self.nodes.push(Arc::new(node));
let node = self.nodes.last().unwrap();
// initialize data directory w/o files
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
let initdb_path = self.pg_bin_dir.join("initdb");
println!("initdb_path: {}", initdb_path.to_str().unwrap());
let initdb = Command::new(initdb_path)
.args(&["-D", node.pgdata.to_str().unwrap()])
.arg("-N")
.arg("--no-instructions")
.arg("--compute-node")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
// listen for selected port
node.append_conf(
"postgresql.conf",
format!(
"\
max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n\
computenode_mode = true\n\
",
address = node.ip,
port = node.port
)
.as_str(),
);
node
}
pub fn new_node(&mut self) -> Arc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let node = self.new_vanilla_node();
let pserver = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf(
"postgresql.conf",
format!(
"\
page_server_connstring = 'host={} port={}'\n\
",
pserver.ip(),
pserver.port()
)
.as_str(),
);
node.clone()
}
pub fn new_master_node(&mut self) -> Arc<PostgresNode> {
let node = self.new_vanilla_node();
node.append_conf(
"postgresql.conf",
"synchronous_standby_names = 'safekeeper_proxy'\n\
",
);
node.clone()
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
let status = Command::new("kill")
.arg(self.pid.to_string())
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct PostgresNode {
_node_id: usize,
pub port: u16,
pub ip: IpAddr,
pgdata: PathBuf,
pg_bin_dir: PathBuf,
}
impl PostgresNode {
pub fn append_conf(&self, config: &str, opts: &str) {
OpenOptions::new()
.append(true)
.open(self.pgdata.join(config).to_str().unwrap())
.unwrap()
.write_all(opts.as_bytes())
.unwrap();
}
fn pg_ctl(&self, args: &[&str], check_ok: bool) {
let pg_ctl_path = self.pg_bin_dir.join("pg_ctl");
let pg_ctl = Command::new(pg_ctl_path)
.args(
[
&[
"-D",
self.pgdata.to_str().unwrap(),
"-l",
self.pgdata.join("log").to_str().unwrap(),
],
args,
]
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to execute pg_ctl");
if check_ok && !pg_ctl.success() {
panic!("pg_ctl failed");
}
}
pub fn start(&self, storage_cplane: &StorageControlPlane) {
if storage_cplane.page_servers.len() != 0 {
let _res =
storage_cplane.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
}
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"], true);
}
pub fn restart(&self) {
self.pg_ctl(&["restart"], true);
}
pub fn stop(&self) {
self.pg_ctl(&["-m", "immediate", "stop"], true);
}
pub fn connstr(&self) -> String {
format!("host={} port={} user={}", self.ip, self.port, self.whoami())
}
// XXX: cache that in control plane
pub fn whoami(&self) -> String {
let output = Command::new("whoami")
.output()
.expect("failed to execute whoami");
if !output.status.success() {
panic!("whoami failed");
}
String::from_utf8(output.stdout).unwrap().trim().to_string()
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,
self.port,
db,
self.whoami()
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
client.query(sql, &[]).unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,
self.port,
db,
self.whoami()
);
Client::connect(connstring.as_str(), NoTls).unwrap()
}
pub fn get_pgdata(&self) -> Option<&str> {
self.pgdata.to_str()
}
/* Create stub controlfile and respective xlog to start computenode */
pub fn setup_controlfile(&self) {
let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap());
{
File::create(filepath).unwrap();
}
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
let pg_resetwal = Command::new(pg_resetwal_path)
.args(&["-D", self.pgdata.to_str().unwrap()])
.arg("-f")
// TODO probably we will have to modify pg_resetwal
// .arg("--compute-node")
.status()
.expect("failed to execute pg_resetwal");
if !pg_resetwal.success() {
panic!("pg_resetwal failed");
}
}
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
let proxy_path = PG_BIN_DIR.join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["-s", &wal_acceptors])
.args(&["-h", &self.ip.to_string()])
.args(&["-p", &self.port.to_string()])
.arg("-v")
.stderr(File::create(TEST_WORKDIR.join("safepkeeper_proxy.log")).unwrap())
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
pub fn pg_regress(&self) {
self.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", PG_BIN_DIR.to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.env("PGPORT", self.port.to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.ip.to_string())
.status()
.expect("pg_regress failed");
}
pub fn pg_bench(&self, clients: u32, seconds: u32) {
let port = self.port.to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(PG_BIN_DIR.join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("pgbench -i");
let _pg_bench_run = Command::new(PG_BIN_DIR.join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("pgbench run");
}
}
impl Drop for PostgresNode {
// destructor to clean up state after test is done
// XXX: we may detect failed test by setting some flag in catch_unwind()
// and checking it here. But let just clean datadirs on start.
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.pgdata.clone()).unwrap();
}
}

View File

@@ -1,8 +1,9 @@
#[allow(dead_code)]
mod control_plane;
// mod control_plane;
use control_plane::compute::ComputeControlPlane;
use control_plane::storage::TestStorageControlPlane;
use control_plane::ComputeControlPlane;
use control_plane::StorageControlPlane;
use std::thread::sleep;
use std::time::Duration;
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff
@@ -12,12 +13,12 @@ use control_plane::StorageControlPlane;
#[test]
fn test_redo_cases() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
let node = compute_cplane.new_test_node();
node.start().unwrap();
// check basic work with table
node.safe_psql(
@@ -49,15 +50,17 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
#[ignore]
fn test_regress() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
let node = compute_cplane.new_test_node();
node.start().unwrap();
<<<<<<< HEAD
node.pg_regress();
}
@@ -73,20 +76,23 @@ fn pgbench() {
node.start(&storage_cplane);
node.pg_bench(10, 100);
=======
control_plane::storage::regress_check(&node);
>>>>>>> main
}
// Run two postgres instances on one pageserver
#[test]
fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// Allocate postgres instance, but don't start
let node1 = compute_cplane.new_node();
let node2 = compute_cplane.new_node();
node1.start(&storage_cplane);
node2.start(&storage_cplane);
let node1 = compute_cplane.new_test_node();
let node2 = compute_cplane.new_test_node();
node1.start().unwrap();
node2.start().unwrap();
// check node1
node1.safe_psql(
@@ -122,3 +128,36 @@ fn test_pageserver_multitenancy() {
println!("sum = {}", count);
assert_eq!(count, 15000150000);
}
#[test]
fn test_upload_pageserver_local() {
// Init pageserver that reads WAL directly from that postgres
// Don't start yet
let storage_cplane = TestStorageControlPlane::one_page_server_no_start();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// init postgres node
let node = compute_cplane.new_test_node();
//upload data to pageserver & start it
&storage_cplane
.pageserver
.start_fromdatadir(node.pgdata().to_str().unwrap().to_string())
.unwrap();
sleep(Duration::from_secs(10));
// start postgres node
node.start().unwrap();
// check basic work with table
node.safe_psql(
"postgres",
"CREATE TABLE t(key int primary key, value text)",
);
node.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
);
}

View File

@@ -1,8 +1,6 @@
// Restart acceptors one by one while compute is under the load.
#[allow(dead_code)]
mod control_plane;
use control_plane::ComputeControlPlane;
use control_plane::StorageControlPlane;
use control_plane::compute::ComputeControlPlane;
use control_plane::storage::TestStorageControlPlane;
use rand::Rng;
use std::sync::Arc;
@@ -13,13 +11,13 @@ use std::{thread, time};
fn test_acceptors_normal_work() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
// start postgres
let node = compute_cplane.new_test_master_node();
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -43,6 +41,53 @@ fn test_acceptors_normal_work() {
// check wal files equality
}
#[test]
fn test_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
const N_NODES: usize = 5;
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let mut nodes = Vec::new();
let mut proxies = Vec::new();
for _ in 0..N_NODES {
let node = compute_cplane.new_test_master_node();
nodes.push(node);
nodes.last().unwrap().start().unwrap();
proxies.push(nodes.last().unwrap().start_proxy(wal_acceptors.clone()));
}
// create schema
for node in &nodes {
node.safe_psql(
"postgres",
"CREATE TABLE t(key int primary key, value text)",
);
}
// Populate data
for node in &nodes {
node.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
);
}
// Check data
for node in &nodes {
let count: i64 = node
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
}
}
// Majority is always alive
#[test]
fn test_acceptors_restarts() {
@@ -50,14 +95,14 @@ fn test_acceptors_restarts() {
const REDUNDANCY: usize = 3;
const FAULT_PROBABILITY: f32 = 0.01;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
let mut rng = rand::thread_rng();
// start postgre
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
// start postgres
let node = compute_cplane.new_test_master_node();
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -80,7 +125,7 @@ fn test_acceptors_restarts() {
} else {
let node: usize = rng.gen_range(0..REDUNDANCY);
failed_node = Some(node);
storage_cplane.wal_acceptors[node].stop();
storage_cplane.wal_acceptors[node].stop().unwrap();
}
}
}
@@ -93,7 +138,7 @@ fn test_acceptors_restarts() {
assert_eq!(count, 500500);
}
fn start_acceptor(cplane: &Arc<StorageControlPlane>, no: usize) {
fn start_acceptor(cplane: &Arc<TestStorageControlPlane>, no: usize) {
let cp = cplane.clone();
thread::spawn(move || {
thread::sleep(time::Duration::from_secs(1));
@@ -109,13 +154,13 @@ fn test_acceptors_unavalability() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 2;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
// start postgres
let node = compute_cplane.new_test_master_node();
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -129,7 +174,7 @@ fn test_acceptors_unavalability() {
psql.execute("INSERT INTO t values (1, 'payload')", &[])
.unwrap();
storage_cplane.wal_acceptors[0].stop();
storage_cplane.wal_acceptors[0].stop().unwrap();
let cp = Arc::new(storage_cplane);
start_acceptor(&cp, 0);
let now = SystemTime::now();
@@ -139,7 +184,7 @@ fn test_acceptors_unavalability() {
psql.execute("INSERT INTO t values (3, 'payload')", &[])
.unwrap();
cp.wal_acceptors[1].stop();
cp.wal_acceptors[1].stop().unwrap();
start_acceptor(&cp, 1);
psql.execute("INSERT INTO t values (4, 'payload')", &[])
.unwrap();
@@ -157,16 +202,16 @@ fn test_acceptors_unavalability() {
assert_eq!(count, 15);
}
fn simulate_failures(cplane: &Arc<StorageControlPlane>) {
fn simulate_failures(cplane: Arc<TestStorageControlPlane>) {
let mut rng = rand::thread_rng();
let n_acceptors = cplane.wal_acceptors.len();
let failure_period = time::Duration::from_secs(1);
loop {
while cplane.is_running() {
thread::sleep(failure_period);
let mask: u32 = rng.gen_range(0..(1 << n_acceptors));
for i in 0..n_acceptors {
if (mask & (1 << i)) != 0 {
cplane.wal_acceptors[i].stop();
cplane.wal_acceptors[i].stop().unwrap();
}
}
thread::sleep(failure_period);
@@ -184,13 +229,13 @@ fn test_race_conditions() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant(REDUNDANCY));
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
// start postgres
let node = compute_cplane.new_test_master_node();
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -200,10 +245,10 @@ fn test_race_conditions() {
"postgres",
"CREATE TABLE t(key int primary key, value text)",
);
let cplane = Arc::new(storage_cplane);
let cp = cplane.clone();
thread::spawn(move || {
simulate_failures(&cp);
let cp = storage_cplane.clone();
let failures_thread = thread::spawn(move || {
simulate_failures(cp);
});
let mut psql = node.open_psql("postgres");
@@ -218,5 +263,7 @@ fn test_race_conditions() {
.get(0);
println!("sum = {}", count);
assert_eq!(count, 500500);
cplane.stop();
storage_cplane.stop();
failures_thread.join().unwrap();
}

View File

@@ -26,7 +26,7 @@ clap = "2.33.0"
termion = "1.5.6"
tui = "0.14.0"
daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] }
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
@@ -35,3 +35,5 @@ postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replica
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" }
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"
thiserror = "1.0"

View File

@@ -6,24 +6,24 @@ use log::*;
use std::fs;
use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::{fs::OpenOptions, str::FromStr};
use std::fs::OpenOptions;
use anyhow::{Context, Result};
use clap::{App, Arg};
use daemonize::Daemonize;
use slog;
use slog::Drain;
use slog_scope;
use slog_stdlog;
use pageserver::page_service;
use pageserver::restore_datadir;
use pageserver::restore_s3;
use pageserver::tui;
use pageserver::walreceiver;
use pageserver::PageServerConf;
fn main() -> Result<(), io::Error> {
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.arg(Arg::with_name("datadir")
@@ -51,10 +51,10 @@ fn main() -> Result<(), io::Error> {
.long("daemonize")
.takes_value(false)
.help("Run in the background"))
.arg(Arg::with_name("skip_recovery")
.long("skip-recovery")
.takes_value(false)
.help("Skip S3 recovery procedy and start empty"))
.arg(Arg::with_name("restore_from")
.long("restore-from")
.takes_value(true)
.help("Upload data from s3 or datadir"))
.get_matches();
let mut conf = PageServerConf {
@@ -63,7 +63,7 @@ fn main() -> Result<(), io::Error> {
interactive: false,
wal_producer_connstr: None,
listen_addr: "127.0.0.1:5430".parse().unwrap(),
skip_recovery: false,
restore_from: String::new(),
};
if let Some(dir) = arg_matches.value_of("datadir") {
@@ -79,31 +79,29 @@ fn main() -> Result<(), io::Error> {
}
if conf.daemonize && conf.interactive {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"--daemonize is not allowed with --interactive: choose one",
));
eprintln!("--daemonize is not allowed with --interactive: choose one");
exit(1);
}
if arg_matches.is_present("skip_recovery") {
conf.skip_recovery = true;
if let Some(restore_from) = arg_matches.value_of("restore_from") {
conf.restore_from = String::from(restore_from);
}
if let Some(addr) = arg_matches.value_of("wal_producer") {
conf.wal_producer_connstr = Some(String::from_str(addr).unwrap());
conf.wal_producer_connstr = Some(String::from(addr));
}
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.parse().unwrap();
conf.listen_addr = addr.parse()?;
}
start_pageserver(conf)
start_pageserver(&conf)
}
fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
fn start_pageserver(conf: &PageServerConf) -> Result<()> {
// Initialize logger
let _scope_guard = init_logging(&conf);
let _log_guard = slog_stdlog::init().unwrap();
let _scope_guard = init_logging(&conf)?;
let _log_guard = slog_stdlog::init()?;
// Note: this `info!(...)` macro comes from `log` crate
info!("standard logging redirected to slog");
@@ -127,18 +125,15 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
if conf.daemonize {
info!("daemonizing...");
// There should'n be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fpritf's or backtraces.
// There shouldn't be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fprintf's or backtraces.
let log_filename = conf.data_dir.join("pageserver.log");
let stdout = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("pageserver-stdout.log"))
.unwrap();
let stderr = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("pageserver-stderr.log"))
.unwrap();
.open(&log_filename)
.with_context(|| format!("failed to open {:?}", log_filename))?;
let stderr = stdout.try_clone()?;
let daemonize = Daemonize::new()
.pid_file(conf.data_dir.join("pageserver.pid"))
@@ -154,13 +149,17 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
let mut threads = Vec::new();
info!("starting...");
info!("starting... {}", conf.restore_from);
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
if !conf.skip_recovery {
if conf.restore_from.eq("s3") {
info!("restore-from s3...");
restore_s3::restore_main(&conf);
} else if conf.restore_from.eq("local") {
info!("restore-from local...");
restore_datadir::restore_main(&conf);
}
// Create directory for wal-redo datadirs
@@ -169,7 +168,7 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
Err(e) => match e.kind() {
io::ErrorKind::AlreadyExists => {}
_ => {
panic!("Failed to create wal-redo data directory: {}", e);
anyhow::bail!("Failed to create wal-redo data directory: {}", e);
}
},
}
@@ -181,13 +180,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
//
// All other wal receivers are started on demand by "callmemaybe" command
// sent to pageserver.
let conf_copy = conf.clone();
if let Some(wal_producer) = conf.wal_producer_connstr {
let conf = conf_copy.clone();
if let Some(wal_producer) = &conf.wal_producer_connstr {
let conf_copy = conf.clone();
let wal_producer = wal_producer.clone();
let walreceiver_thread = thread::Builder::new()
.name("static WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf, &wal_producer);
walreceiver::thread_main(&conf_copy, &wal_producer);
})
.unwrap();
threads.push(walreceiver_thread);
@@ -195,12 +194,12 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let conf = conf_copy.clone();
let conf_copy = conf.clone();
let page_server_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(|| {
.spawn(move || {
// thread code
page_service::thread_main(conf);
page_service::thread_main(&conf_copy);
})
.unwrap();
threads.push(page_server_thread);
@@ -217,16 +216,20 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
Ok(())
}
fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard {
fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard, io::Error> {
if conf.interactive {
tui::init_logging()
Ok(tui::init_logging())
} else if conf.daemonize {
let log = conf.data_dir.join("pageserver.log");
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(log)
.unwrap_or_else(|_| panic!("Could not create log file"));
.unwrap_or_else(|_| {
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog::Filter::new(drain, |record: &slog::Record| {
@@ -237,7 +240,7 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard {
});
let drain = std::sync::Mutex::new(drain).fuse();
let logger = slog::Logger::root(drain, slog::o!());
slog_scope::set_global_logger(logger)
Ok(slog_scope::set_global_logger(logger))
} else {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
@@ -255,6 +258,6 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard {
})
.fuse();
let logger = slog::Logger::root(drain, slog::o!());
slog_scope::set_global_logger(logger)
Ok(slog_scope::set_global_logger(logger))
}
}

View File

@@ -3,6 +3,8 @@ use std::path::PathBuf;
pub mod page_cache;
pub mod page_service;
pub mod pg_constants;
pub mod restore_datadir;
pub mod restore_s3;
pub mod tui;
pub mod tui_event;
@@ -11,7 +13,6 @@ pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct PageServerConf {
pub data_dir: PathBuf,
@@ -19,5 +20,5 @@ pub struct PageServerConf {
pub interactive: bool,
pub wal_producer_connstr: Option<String>,
pub listen_addr: SocketAddr,
pub skip_recovery: bool,
pub restore_from: String,
}

View File

@@ -6,24 +6,24 @@
// per-entry mutex.
//
use crate::{walredo, PageServerConf};
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::error::Error;
use core::ops::Bound::Included;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
// use tokio::sync::RwLock;
use lazy_static::lazy_static;
use log::*;
use rocksdb::*;
use std::collections::HashMap;
use crate::{walredo, PageServerConf};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -112,7 +112,7 @@ lazy_static! {
pub static ref PAGECACHES: Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
}
pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc<PageCache> {
pub fn get_pagecache(conf: &PageServerConf, sys_id: u64) -> Arc<PageCache> {
let mut pcaches = PAGECACHES.lock().unwrap();
if !pcaches.contains_key(&sys_id) {
@@ -123,10 +123,11 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc<PageCache> {
// Now join_handle is not saved any where and we won'try restart tharead
// if it is dead. We may later stop that treads after some inactivity period
// and restart them on demand.
let conf = conf.clone();
let _walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
walredo::wal_redo_main(conf, sys_id);
walredo::wal_redo_main(&conf, sys_id);
})
.unwrap();
}
@@ -252,6 +253,19 @@ impl CacheEntryContent {
}
}
impl CacheEntry {
fn new(key: CacheKey) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(CacheEntryContent {
page_image: None,
wal_record: None,
apply_pending: false,
}
}
}
}
impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
CacheEntry {
@@ -345,7 +359,7 @@ impl PageCache {
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
@@ -372,15 +386,11 @@ impl PageCache {
shared = wait_result.0;
if wait_result.1.timed_out() {
error!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
);
return Err(format!(
bail!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
))?;
);
}
}
if waited {
@@ -388,11 +398,23 @@ impl PageCache {
}
if lsn < shared.first_valid_lsn {
return Err(format!(
bail!(
"LSN {:X}/{:X} has already been removed",
lsn >> 32,
lsn & 0xffff_ffff
))?;
);
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
}
let mut buf = BytesMut::new();
@@ -440,14 +462,16 @@ impl PageCache {
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
return Err("could not apply WAL to reconstruct page image".into());
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(tag, lsn, page_img.clone());
} else {
// No base image, and no WAL record. Huh?
panic!("no page image or WAL record for requested page");
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though

View File

@@ -215,7 +215,7 @@ impl FeMessage {
///////////////////////////////////////////////////////////////////////////////
pub fn thread_main(conf: PageServerConf) {
pub fn thread_main(conf: &PageServerConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
@@ -260,7 +260,7 @@ impl Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(10 * 1024),
init_done: false,
conf: conf,
conf,
}
}
@@ -459,7 +459,7 @@ impl Connection {
let _walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf_copy, &connstr);
walreceiver::thread_main(&conf_copy, &connstr);
})
.unwrap();
@@ -504,7 +504,7 @@ impl Connection {
self.stream.write_i16(0).await?; /* numAttributes */
self.stream.flush().await?;
let pcache = page_cache::get_pagecache(self.conf.clone(), sysid);
let pcache = page_cache::get_pagecache(&self.conf, sysid);
loop {
let message = self.read_message().await?;
@@ -561,7 +561,7 @@ impl Connection {
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks: n_blocks,
n_blocks,
}))
.await?
}

View File

@@ -0,0 +1,11 @@
// From pg_tablespace_d.h
//
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
//Special values for non-rel files' tags
//TODO maybe use enum?
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
pub const PG_XACT_FORKNUM: u32 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;

View File

@@ -0,0 +1,339 @@
//
// Restore chunks from S3
//
// This runs once at Page Server startup. It loads all the "base images" from
// S3 into the in-memory page cache. It also initializes the "last valid LSN"
// in the page cache to the LSN of the base image, so that when the WAL receiver
// is started, it starts streaming from that LSN.
//
use bytes::{Buf, BytesMut};
use log::*;
use regex::Regex;
use std::env;
use std::fmt;
use tokio::runtime;
use futures::future;
use crate::{page_cache, pg_constants, PageServerConf};
use std::fs;
use walkdir::WalkDir;
pub fn restore_main(conf: &PageServerConf) {
// Create a new thread pool
let runtime = runtime::Runtime::new().unwrap();
runtime.block_on(async {
let result = restore_chunk(conf).await;
match result {
Ok(_) => {
return;
}
Err(err) => {
error!("error: {}", err);
return;
}
}
});
}
async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> {
let pgdata_base_path = env::var("PGDATA_BASE_PATH").unwrap();
info!("Restoring from local dir...");
let sys_id: u64 = 42;
let control_lsn = 0; //TODO get it from sysid
let mut slurp_futures: Vec<_> = Vec::new();
for e in WalkDir::new(pgdata_base_path.clone()) {
let entry = e.unwrap();
if !entry.path().is_dir() {
let path = entry.path().to_str().unwrap();
let relpath = path
.strip_prefix(&format!("{}/", pgdata_base_path))
.unwrap();
info!(
"Restoring file {} relpath {}",
entry.path().display(),
relpath
);
let parsed = parse_rel_file_path(&relpath);
match parsed {
Ok(mut p) => {
p.lsn = control_lsn;
let f = slurp_base_file(conf, sys_id, path.to_string(), p);
slurp_futures.push(f);
}
Err(e) => {
warn!("unrecognized file: {} ({})", relpath, e);
}
};
}
}
let pcache = page_cache::get_pagecache(conf, sys_id);
pcache.init_valid_lsn(control_lsn);
info!("{} files to restore...", slurp_futures.len());
future::join_all(slurp_futures).await;
info!("restored!");
Ok(())
}
#[derive(Debug)]
struct FilePathError {
msg: String,
}
impl FilePathError {
fn new(msg: &str) -> FilePathError {
FilePathError {
msg: msg.to_string(),
}
}
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(e: core::num::ParseIntError) -> Self {
return FilePathError {
msg: format!("invalid filename: {}", e),
};
}
}
impl fmt::Display for FilePathError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "invalid filename")
}
}
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(0),
Some("fsm") => Ok(1),
Some("vm") => Ok(2),
Some("init") => Ok(3),
Some(_) => Err(FilePathError::new("invalid forkname")),
}
}
#[derive(Debug)]
struct ParsedBaseImageFileName {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u32,
pub segno: u32,
pub lsn: u64,
}
// formats:
// <oid>
// <oid>_<fork name>
// <oid>.<segment number>
// <oid>_<fork name>.<segment number>
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
let caps = re
.captures(fname)
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = u32::from_str_radix(relnode_str, 10)?;
let forkname_match = caps.name("forkname");
let forkname = if forkname_match.is_none() {
None
} else {
Some(forkname_match.unwrap().as_str())
};
let forknum = forkname_to_forknum(forkname)?;
let segno_match = caps.name("segno");
let segno = if segno_match.is_none() {
0
} else {
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
return Ok((relnode, forknum, segno, 0));
}
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
/*
* Relation data files can be in one of the following directories:
*
* global/
* shared relations
*
* base/<db oid>/
* regular relations, default tablespace
*
* pg_tblspc/<tblspc oid>/<tblspc version>/
* within a non-default tablespace (the name of the directory
* depends on version)
*
* And the relation data files themselves have a filename like:
*
* <oid>.<segment number>
*/
if let Some(fname) = path.strip_prefix("global/") {
if fname.contains("pg_control") {
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_CONTROLFILE_FORKNUM,
segno: 0,
lsn: 0,
});
}
if fname.contains("pg_filenode") {
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
segno: 0,
lsn: 0,
});
}
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode,
forknum,
segno,
lsn,
});
} else if let Some(dbpath) = path.strip_prefix("base/") {
let mut s = dbpath.split("/");
let dbnode_str = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let dbnode = u32::from_str_radix(dbnode_str, 10)?;
let fname = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
if s.next().is_some() {
return Err(FilePathError::new("invalid relation data file name"));
};
if fname.contains("pg_filenode") {
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode: dbnode,
relnode: 0,
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
segno: 0,
lsn: 0,
});
}
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode,
relnode,
forknum,
segno,
lsn,
});
} else if let Some(fname) = path.strip_prefix("pg_xact/") {
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_XACT_FORKNUM,
segno: u32::from_str_radix(fname, 10).unwrap(),
lsn: 0,
});
} else if let Some(fname) = path.strip_prefix("pg_multixact/members/") {
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_MXACT_MEMBERS_FORKNUM,
segno: u32::from_str_radix(fname, 10).unwrap(),
lsn: 0,
});
} else if let Some(fname) = path.strip_prefix("pg_multixact/offsets/") {
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_MXACT_OFFSETS_FORKNUM,
segno: u32::from_str_radix(fname, 10).unwrap(),
lsn: 0,
});
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
// TODO
return Err(FilePathError::new("tablespaces not supported"));
} else {
return Err(FilePathError::new("invalid relation data file name"));
}
}
async fn slurp_base_file(
conf: &PageServerConf,
sys_id: u64,
file_path: String,
parsed: ParsedBaseImageFileName,
) {
info!("slurp_base_file local path {}", file_path);
let mut data = fs::read(file_path).unwrap();
// pg_filenode.map has non-standard size - 512 bytes
// enlarge it to treat as a regular page
if parsed.forknum == pg_constants::PG_FILENODEMAP_FORKNUM {
data.resize(8192, 0);
}
let data_bytes: &[u8] = &data;
let mut bytes = BytesMut::from(data_bytes).freeze();
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let pcache = page_cache::get_pagecache(conf, sys_id);
let reltag = page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
};
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
pcache.relsize_inc(&reltag, Some(blknum));
blknum += 1;
}
}

View File

@@ -60,8 +60,8 @@ pub fn restore_main(conf: &PageServerConf) {
async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
let backend = Storage {
region: Region::Custom {
region: env::var("S3_REGION").unwrap().into(),
endpoint: env::var("S3_ENDPOINT").unwrap().into(),
region: env::var("S3_REGION").unwrap(),
endpoint: env::var("S3_ENDPOINT").unwrap(),
},
credentials: Credentials::new(
Some(&env::var("S3_ACCESSKEY").unwrap()),
@@ -119,7 +119,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
panic!("no base backup found");
}
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
let pcache = page_cache::get_pagecache(conf, sys_id);
pcache.init_valid_lsn(oldest_lsn);
info!("{} files to restore...", slurp_futures.len());
@@ -305,7 +305,7 @@ async fn slurp_base_file(
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
let pcache = page_cache::get_pagecache(conf, sys_id);
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
@@ -315,7 +315,7 @@ async fn slurp_base_file(
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
},
blknum: blknum,
blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));

View File

@@ -14,7 +14,6 @@ use tui::text::{Span, Spans, Text};
use tui::widgets::{Block, BorderType, Borders, Paragraph, Widget};
use tui::Terminal;
use slog;
use slog::Drain;
lazy_static! {
@@ -188,6 +187,7 @@ pub fn ui_main<'b>() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[allow(dead_code)]
struct LogWidget<'a> {
logger: &'a TuiLogger,
title: &'a str,

View File

@@ -10,7 +10,6 @@ use std::time::Duration;
use termion::event::Key;
use termion::input::TermRead;
#[allow(dead_code)]
pub enum Event<I> {
Input(I),
Tick,

View File

@@ -10,7 +10,6 @@
//
use chrono::offset::Local;
use chrono::DateTime;
use slog;
use slog::{Drain, Level, OwnedKVList, Record};
use slog_async::AsyncRecord;
use std::collections::VecDeque;
@@ -81,7 +80,7 @@ impl<'b> TuiLoggerWidget<'b> {
style_trace: None,
style_info: None,
show_module: true,
logger: logger,
logger,
}
}
}

View File

@@ -44,6 +44,7 @@ struct XLogLongPageHeaderData {
#[allow(non_upper_case_globals)]
const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4;
#[allow(dead_code)]
pub struct WalStreamDecoder {
lsn: u64,
@@ -63,7 +64,7 @@ pub struct WalStreamDecoder {
impl WalStreamDecoder {
pub fn new(lsn: u64) -> WalStreamDecoder {
WalStreamDecoder {
lsn: lsn,
lsn,
startlsn: 0,
contlen: 0,
@@ -253,6 +254,7 @@ const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
#[allow(dead_code)]
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
//in_use: bool,

View File

@@ -1,28 +1,30 @@
//
// WAL receiver
//
// The WAL receiver connects to the WAL safekeeper service, and streams WAL.
// For each WAL record, it decodes the record to figure out which data blocks
// the record affects, and adds the records to the page cache.
//
use log::*;
use tokio::runtime;
use tokio::time::{sleep, Duration};
use tokio_stream::StreamExt;
//!
//! WAL receiver
//!
//! The WAL receiver connects to the WAL safekeeper service, and streams WAL.
//! For each WAL record, it decodes the record to figure out which data blocks
//! the record affects, and adds the records to the page cache.
//!
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::waldecoder::*;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use anyhow::Error;
use log::*;
use postgres_protocol::message::backend::ReplicationMessage;
use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
use postgres_types::PgLsn;
use std::str::FromStr;
use tokio::runtime;
use tokio::time::{sleep, Duration};
use tokio_postgres::replication::{PgTimestamp, ReplicationStream};
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
//
// This is the entry point for the WAL receiver thread.
//
pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) {
info!("WAL receiver thread started: '{}'", wal_producer_connstr);
let runtime = runtime::Builder::new_current_thread()
@@ -32,31 +34,25 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
runtime.block_on(async {
loop {
let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await;
let res = walreceiver_main(conf, wal_producer_connstr).await;
// TODO: print/log the error
info!(
"WAL streaming connection failed, retrying in 1 second...: {:?}",
_res
);
sleep(Duration::from_secs(1)).await;
if let Err(e) = res {
info!(
"WAL streaming connection failed ({}), retrying in 1 second",
e
);
sleep(Duration::from_secs(1)).await;
}
}
});
}
async fn walreceiver_main(
conf: PageServerConf,
wal_producer_connstr: &String,
) -> Result<(), Error> {
async fn walreceiver_main(conf: &PageServerConf, wal_producer_connstr: &str) -> Result<(), Error> {
// Connect to the database in replication mode.
debug!("connecting to {}...", wal_producer_connstr);
let (mut rclient, connection) = connect_replication(
wal_producer_connstr.as_str(),
NoTls,
ReplicationMode::Physical,
)
.await?;
debug!("connected!");
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
let (rclient, connection) = tokio_postgres::connect(&connect_cfg, NoTls).await?;
info!("connected!");
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
@@ -66,28 +62,28 @@ async fn walreceiver_main(
}
});
let identify_system = rclient.identify_system().await?;
let end_of_wal = u64::from(identify_system.xlogpos());
let identify = identify_system(&rclient).await?;
info!("{:?}", identify);
let end_of_wal = u64::from(identify.xlogpos);
let mut caught_up = false;
let sysid: u64 = identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecache(conf, sysid);
let pcache = page_cache::get_pagecache(conf, identify.systemid);
//
// Start streaming the WAL, from where we left off previously.
//
let mut startpoint = pcache.get_last_valid_lsn();
if startpoint == 0 {
// If we start here with identify_system.xlogpos() we will have race condition with
// If we start here with identify.xlogpos we will have race condition with
// postgres start: insert into postgres may request page that was modified with lsn
// smaller than identify_system.xlogpos().
// smaller than identify.xlogpos.
//
// Current procedure for starting postgres will anyway be changed to something
// different like having 'initdb' method on a pageserver (or importing some shared
// empty database snapshot), so for now I just put start of first segment which
// seems to be a valid record.
pcache.init_valid_lsn(0x_1_000_000_u64);
startpoint = u64::from(0x_1_000_000_u64);
startpoint = 0x_1_000_000_u64;
} else {
// There might be some padding after the last full record, skip it.
//
@@ -105,10 +101,14 @@ async fn walreceiver_main(
(end_of_wal >> 32),
(end_of_wal & 0xffffffff)
);
let startpoint = tokio_postgres::types::Lsn::from(startpoint);
let mut physical_stream = rclient
.start_physical_replication(None, startpoint, None)
.await?;
let startpoint = PgLsn::from(startpoint);
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = rclient.copy_both_simple::<bytes::Bytes>(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
tokio::pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
while let Some(replication_message) = physical_stream.next().await {
@@ -132,8 +132,7 @@ async fn walreceiver_main(
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode() {
let decoded =
crate::waldecoder::decode_wal_record(startlsn, recdata.clone());
let decoded = decode_wal_record(startlsn, recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
@@ -151,7 +150,7 @@ async fn walreceiver_main(
};
let rec = page_cache::WALRecord {
lsn: lsn,
lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
@@ -209,12 +208,81 @@ async fn walreceiver_main(
}
}
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {
trace!("received PrimaryKeepAlive");
// FIXME: Reply, or the connection will time out
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
let wal_end = keepalive.wal_end();
let timestamp = keepalive.timestamp();
let reply_requested: bool = keepalive.reply() != 0;
trace!(
"received PrimaryKeepAlive(wal_end: {}, timestamp: {} reply: {})",
wal_end,
timestamp,
reply_requested,
);
if reply_requested {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(pcache.get_last_valid_lsn());
let write_lsn = last_lsn;
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::INVALID;
let ts = PgTimestamp::now()?;
const NO_REPLY: u8 = 0u8;
physical_stream
.as_mut()
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)
.await?;
}
}
_ => (),
}
}
return Ok(());
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.
///
/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
#[derive(Debug)]
pub struct IdentifySystem {
systemid: u64,
timeline: u32,
xlogpos: PgLsn,
dbname: Option<String>,
}
/// There was a problem parsing the response to
/// a postgres IDENTIFY_SYSTEM command.
#[derive(Debug, thiserror::Error)]
#[error("IDENTIFY_SYSTEM parse error")]
pub struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
pub async fn identify_system(client: &tokio_postgres::Client) -> Result<IdentifySystem, Error> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
where
T: FromStr,
{
let val = row.get(idx).ok_or(IdentifyError)?;
val.parse::<T>().or(Err(IdentifyError))
}
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,
xlogpos: get_parse(first_row, 2)?,
dbname: get_parse(first_row, 3).ok(),
})
} else {
Err(IdentifyError)?
}
}

View File

@@ -43,7 +43,7 @@ static TIMEOUT: Duration = Duration::from_secs(20);
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
pub fn wal_redo_main(conf: &PageServerConf, sys_id: u64) {
info!("WAL redo thread started {}", sys_id);
// We block on waiting for requests on the walredo request channel, but
@@ -54,7 +54,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
.build()
.unwrap();
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
let pcache = page_cache::get_pagecache(conf, sys_id);
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &pcache.walredo_receiver;
@@ -215,7 +215,7 @@ impl WalRedoProcess {
tokio::spawn(f_stderr);
Ok(WalRedoProcess {
child: child,
child,
stdin: RefCell::new(stdin),
stdout: RefCell::new(stdout),
})

View File

@@ -10,6 +10,10 @@
#
# 2) installs postgres to REPO_ROOT/tmp_install/
#
# Halt immediately if any command fails
set -e
REPO_ROOT=$(dirname "$0")
REPO_ROOT="`( cd \"$REPO_ROOT\" && pwd )`"

View File

@@ -26,13 +26,11 @@ clap = "2.33.0"
termion = "1.5.6"
tui = "0.14.0"
daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] }
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
anyhow = "1.0"
crc32c = "0.6.0"
pageserver = { path = "../pageserver" }

View File

@@ -11,10 +11,7 @@ use std::{fs::File, fs::OpenOptions};
use clap::{App, Arg};
use slog;
use slog::Drain;
use slog_scope;
use slog_stdlog;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
@@ -92,7 +89,7 @@ fn main() -> Result<(), io::Error> {
fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> {
// Initialize logger
let _scope_guard = init_logging(&conf);
let _scope_guard = init_logging(&conf)?;
let _log_guard = slog_stdlog::init().unwrap();
// Note: this `info!(...)` macro comes from `log` crate
info!("standard logging redirected to slog");
@@ -141,20 +138,24 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> {
Ok(())
}
fn init_logging(conf: &WalAcceptorConf) -> slog_scope::GlobalLoggerGuard {
fn init_logging(conf: &WalAcceptorConf) -> Result<slog_scope::GlobalLoggerGuard, io::Error> {
if conf.daemonize {
let log = conf.data_dir.join("wal_acceptor.log");
let log_file = File::create(log).unwrap_or_else(|_| panic!("Could not create log file"));
let log_file = File::create(&log).map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let logger = slog::Logger::root(drain, slog::o!());
slog_scope::set_global_logger(logger)
Ok(slog_scope::set_global_logger(logger))
} else {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1000).build().fuse();
let logger = slog::Logger::root(drain, slog::o!());
return slog_scope::set_global_logger(logger);
Ok(slog_scope::set_global_logger(logger))
}
}

View File

@@ -6,7 +6,6 @@ mod pq_protocol;
pub mod wal_service;
pub mod xlog_utils;
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,

View File

@@ -402,7 +402,7 @@ impl System {
},
};
System {
id: id,
id,
mutex: Mutex::new(shared_state),
cond: Notify::new(),
}
@@ -989,7 +989,7 @@ impl Connection {
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => {
return Err(e.into());
return Err(e);
}
}
@@ -1018,12 +1018,19 @@ impl Connection {
Ok(opened_file) => file = opened_file,
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
return Err(e);
}
}
}
}
let send_size = min((end_pos - start_pos) as usize, MAX_SEND_SIZE);
let xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize;
// How much to read and send in message? We cannot cross the WAL file
// boundary, and we don't want send more than MAX_SEND_SIZE.
let send_size = (end_pos - start_pos) as usize;
let send_size = min(send_size, wal_seg_size - xlogoff);
let send_size = min(send_size, MAX_SEND_SIZE);
let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size;
let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE;
let data_end = data_start + send_size;
@@ -1130,7 +1137,7 @@ impl Connection {
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
return Err(e);
}
}
}

11
zenith/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "zenith"
version = "0.1.0"
authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.0"
control_plane = { path = "../control_plane" }

134
zenith/src/main.rs Normal file
View File

@@ -0,0 +1,134 @@
use clap::{App, Arg, ArgMatches, SubCommand};
use std::error;
use std::process::exit;
use control_plane::{compute::ComputeControlPlane, local_env, storage};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
fn main() {
let name_arg = Arg::with_name("NAME")
.short("n")
.index(1)
.help("name of this postgres instance")
.required(true);
let matches = App::new("zenith")
.subcommand(SubCommand::with_name("init"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("status"))
.subcommand(
SubCommand::with_name("pg")
.about("Manage postgres instances")
.subcommand(
SubCommand::with_name("create"), // .arg(name_arg.clone()
// .required(false)
// .help("name of this postgres instance (will be pgN if omitted)"))
)
.subcommand(SubCommand::with_name("list"))
.subcommand(SubCommand::with_name("start").arg(name_arg.clone()))
.subcommand(SubCommand::with_name("stop").arg(name_arg.clone()))
.subcommand(SubCommand::with_name("destroy").arg(name_arg.clone())),
)
.subcommand(
SubCommand::with_name("snapshot")
.about("Manage database snapshots")
.subcommand(SubCommand::with_name("create"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("destroy")),
)
.get_matches();
// handle init separately and exit
if let Some("init") = matches.subcommand_name() {
match local_env::init() {
Ok(_) => {
println!("Initialization complete! You may start zenith with 'zenith start' now.");
exit(0);
}
Err(e) => {
eprintln!("Error during init: {}", e);
exit(1);
}
}
}
// all other commands would need config
let env = match local_env::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config from ~/.zenith: {}", e);
exit(1);
}
};
match matches.subcommand() {
("init", Some(_)) => {
panic!() /* Should not happen. Init was handled before */
}
("start", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env);
if let Err(e) = pageserver.start() {
eprintln!("pageserver start: {}", e);
exit(1);
}
}
("stop", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
eprintln!("pageserver stop: {}", e);
exit(1);
}
}
("status", Some(_sub_m)) => {}
("pg", Some(pg_match)) => {
if let Err(e) = handle_pg(pg_match, &env) {
eprintln!("pg operation failed: {}", e);
exit(1);
}
}
_ => {}
}
}
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let mut cplane = ComputeControlPlane::load(env.clone())?;
match pg_match.subcommand() {
("create", Some(_sub_m)) => {
cplane.new_node()?;
}
("list", Some(_sub_m)) => {
println!("NODE\tADDRESS\tSTATUS");
for (node_name, node) in cplane.nodes.iter() {
println!("{}\t{}\t{}", node_name, node.address, node.status());
}
}
("start", Some(sub_m)) => {
let name = sub_m.value_of("NAME").unwrap();
let node = cplane
.nodes
.get(name)
.ok_or(format!("postgres {} is not found", name))?;
node.start()?;
}
("stop", Some(sub_m)) => {
let name = sub_m.value_of("NAME").unwrap();
let node = cplane
.nodes
.get(name)
.ok_or(format!("postgres {} is not found", name))?;
node.stop()?;
}
_ => {}
}
Ok(())
}