Introduce authentication v0.1.

Current state with authentication.
Page server validates JWT token passed as a password during connection
phase and later when performing an action such as create branch tenant
parameter of an operation is validated to match one submitted in token.
To allow access from console there is dedicated scope: PageServerApi,
this scope allows access to all tenants. See code for access validation in:
PageServerHandler::check_permission.

Because we are in progress of refactoring of communication layer
involving wal proposer protocol, and safekeeper<->pageserver. Safekeeper
now doesn’t check token passed from compute, and uses “hardcoded” token
passed via environment variable to communicate with pageserver.

Compute postgres now takes token from environment variable and passes it
as a password field in pageserver connection. It is not passed through
settings because then user will be able to retrieve it using pg_settings
or SHOW ..

I’ve added basic test in test_auth.py. Probably after we add
authentication to remaining network paths we should enable it by default
and switch all existing tests to use it.
This commit is contained in:
Dmitry Rodionov
2021-08-05 02:20:26 +03:00
committed by Dmitry
parent 5f0fd093d7
commit ce5333656f
35 changed files with 1141 additions and 460 deletions

90
Cargo.lock generated
View File

@@ -107,6 +107,12 @@ dependencies = [
"anyhow",
]
[[package]]
name = "base64"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff"
[[package]]
name = "base64"
version = "0.13.0"
@@ -826,6 +832,20 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonwebtoken"
version = "7.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afabcc15e437a6484fc4f12d0fd63068fe457bf93f1c148d3d9649c60b103f32"
dependencies = [
"base64 0.12.3",
"pem",
"ring",
"serde",
"serde_json",
"simple_asn1",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@@ -1019,6 +1039,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@@ -1178,6 +1209,17 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb"
dependencies = [
"base64 0.13.0",
"once_cell",
"regex",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@@ -1240,7 +1282,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3"
dependencies = [
"base64",
"base64 0.13.0",
"byteorder",
"bytes",
"fallible-iterator",
@@ -1257,7 +1299,7 @@ name = "postgres-protocol"
version = "0.6.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858"
dependencies = [
"base64",
"base64 0.13.0",
"byteorder",
"bytes",
"fallible-iterator",
@@ -1486,7 +1528,7 @@ version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22"
dependencies = [
"base64",
"base64 0.13.0",
"bytes",
"encoding_rs",
"futures-core",
@@ -1515,6 +1557,21 @@ dependencies = [
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rocksdb"
version = "0.16.0"
@@ -1545,7 +1602,7 @@ dependencies = [
"async-trait",
"aws-creds",
"aws-region",
"base64",
"base64 0.13.0",
"cfg-if 1.0.0",
"chrono",
"futures",
@@ -1740,6 +1797,17 @@ dependencies = [
"libc",
]
[[package]]
name = "simple_asn1"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b"
dependencies = [
"chrono",
"num-bigint",
"num-traits",
]
[[package]]
name = "siphasher"
version = "0.3.5"
@@ -1821,6 +1889,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "stringprep"
version = "0.1.2"
@@ -2157,6 +2231,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.2"
@@ -2443,8 +2523,10 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"hex",
"hex-literal",
"hyper",
"jsonwebtoken",
"lazy_static",
"log",
"postgres",

View File

@@ -15,10 +15,11 @@ use anyhow::{Context, Result};
use lazy_static::lazy_static;
use regex::Regex;
use zenith_utils::connstring::connection_host_port;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
use crate::local_env::LocalEnv;
use pageserver::{ZTenantId, ZTimelineId};
use crate::storage::PageServerNode;
//
@@ -103,7 +104,7 @@ impl ComputeControlPlane {
tenantid,
});
node.init_from_page_server()?;
node.init_from_page_server(self.env.auth_type)?;
self.nodes
.insert((tenantid, node.name.clone()), Arc::clone(&node));
@@ -247,7 +248,7 @@ impl PostgresNode {
// Connect to a page server, get base backup, and untar it to initialize a
// new data directory
pub fn init_from_page_server(&self) -> Result<()> {
pub fn init_from_page_server(&self, auth_type: AuthType) -> Result<()> {
let pgdata = self.pgdata();
println!(
@@ -322,18 +323,27 @@ impl PostgresNode {
// Connect it to the page server.
// set up authentication
let password = if let AuthType::ZenithJWT = auth_type {
"$ZENITH_AUTH_TOKEN"
} else {
""
};
// Configure that node to take pages from pageserver
let (host, port) = connection_host_port(&self.pageserver.connection_config());
let (host, port) = connection_host_port(&self.pageserver.connection_config);
self.append_conf(
"postgresql.conf",
format!(
concat!(
"shared_preload_libraries = zenith\n",
"zenith.page_server_connstring = 'host={} port={}'\n",
// $ZENITH_AUTH_TOKEN will be replaced with value from environment variable during compute pg startup
// it is done this way because otherwise user will be able to retrieve the value using SHOW command or pg_settings
"zenith.page_server_connstring = 'host={} port={} password={}'\n",
"zenith.zenith_timeline='{}'\n",
"zenith.zenith_tenant='{}'\n",
),
host, port, self.timelineid, self.tenantid,
host, port, password, self.timelineid, self.tenantid,
)
.as_str(),
)?;
@@ -368,45 +378,48 @@ impl PostgresNode {
Ok(())
}
fn pg_ctl(&self, args: &[&str]) -> Result<()> {
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
cmd.args(
[
&[
"-D",
self.pgdata().to_str().unwrap(),
"-l",
self.pgdata().join("pg.log").to_str().unwrap(),
"-w", //wait till pg_ctl actually does what was asked
],
args,
]
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);
}
let pg_ctl = cmd.status().with_context(|| "pg_ctl failed")?;
let pg_ctl = Command::new(pg_ctl_path)
.args(
[
&[
"-D",
self.pgdata().to_str().unwrap(),
"-l",
self.pgdata().join("pg.log").to_str().unwrap(),
"-w", //wait till pg_ctl actually does what was asked
],
args,
]
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.with_context(|| "pg_ctl failed")?;
if !pg_ctl.success() {
anyhow::bail!("pg_ctl failed");
}
Ok(())
}
pub fn start(&self) -> Result<()> {
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"])
self.pg_ctl(&["start"], auth_token)
}
pub fn restart(&self) -> Result<()> {
self.pg_ctl(&["restart"])
pub fn restart(&self, auth_token: &Option<String>) -> Result<()> {
self.pg_ctl(&["restart"], auth_token)
}
pub fn stop(&self, destroy: bool) -> Result<()> {
self.pg_ctl(&["-m", "immediate", "stop"])?;
self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
if destroy {
println!(
"Destroying postgres data directory '{}'",

View File

@@ -4,14 +4,17 @@
// Now it also provides init method which acts like a stub for proper installation
// script which will use local paths.
//
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use hex;
use pageserver::ZTenantId;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::{collections::BTreeMap, env};
use url::Url;
use zenith_utils::auth::{encode_from_key_path, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
pub type Remotes = BTreeMap<String, String>;
@@ -39,6 +42,15 @@ pub struct LocalEnv {
#[serde(with = "hex")]
pub tenantid: ZTenantId,
// jwt auth token used for communication with pageserver
pub auth_token: String,
// used to determine which auth type is used
pub auth_type: AuthType,
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
pub remotes: Remotes,
}
@@ -85,7 +97,11 @@ fn base_path() -> PathBuf {
//
// Initialize a new Zenith repository
//
pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> {
pub fn init(
remote_pageserver: Option<&str>,
tenantid: ZTenantId,
auth_type: AuthType,
) -> Result<()> {
// check if config already exists
let base_path = base_path();
if base_path.exists() {
@@ -94,6 +110,7 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()>
base_path.to_str().unwrap()
);
}
fs::create_dir(&base_path)?;
// ok, now check that expected binaries are present
@@ -110,6 +127,44 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()>
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
}
// generate keys for jwt
// openssl genrsa -out private_key.pem 2048
let private_key_path = base_path.join("auth_private_key.pem");
let keygen_output = Command::new("openssl")
.arg("genrsa")
.args(&["-out", private_key_path.to_str().unwrap()])
.arg("2048")
.stdout(Stdio::null())
.output()
.with_context(|| "failed to generate auth private key")?;
if !keygen_output.status.success() {
anyhow::bail!(
"openssl failed: '{}'",
String::from_utf8_lossy(&keygen_output.stderr)
);
}
let public_key_path = base_path.join("auth_public_key.pem");
// openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
let keygen_output = Command::new("openssl")
.arg("rsa")
.args(&["-in", private_key_path.to_str().unwrap()])
.arg("-pubout")
.args(&["-outform", "PEM"])
.args(&["-out", public_key_path.to_str().unwrap()])
.stdout(Stdio::null())
.output()
.with_context(|| "failed to generate auth private key")?;
if !keygen_output.status.success() {
anyhow::bail!(
"openssl failed: '{}'",
String::from_utf8_lossy(&keygen_output.stderr)
);
}
let auth_token =
encode_from_key_path(&Claims::new(None, Scope::PageServerApi), &private_key_path)?;
let conf = if let Some(addr) = remote_pageserver {
// check that addr is parsable
let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?;
@@ -121,6 +176,9 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()>
base_data_dir: base_path,
remotes: BTreeMap::default(),
tenantid,
auth_token,
auth_type,
private_key_path,
}
} else {
// Find zenith binaries.
@@ -136,6 +194,9 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()>
base_data_dir: base_path,
remotes: BTreeMap::default(),
tenantid,
auth_token,
auth_type,
private_key_path,
}
};

View File

@@ -8,8 +8,9 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::ZTenantId;
use postgres::{Config, NoTls};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use crate::local_env::LocalEnv;
use crate::read_pidfile;
@@ -24,33 +25,32 @@ use zenith_utils::connstring::connection_address;
#[derive(Debug)]
pub struct PageServerNode {
pub kill_on_exit: bool,
pub connection_config: Option<Config>,
pub connection_config: Config,
pub env: LocalEnv,
}
impl PageServerNode {
pub fn from_env(env: &LocalEnv) -> PageServerNode {
let password = if matches!(env.auth_type, AuthType::ZenithJWT) {
&env.auth_token
} else {
""
};
PageServerNode {
kill_on_exit: false,
connection_config: None, // default
connection_config: Self::default_config(password), // default
env: env.clone(),
}
}
fn default_config() -> Config {
"postgresql://no_user@localhost:64000/no_db"
fn default_config(password: &str) -> Config {
format!("postgresql://no_user:{}@localhost:64000/no_db", password)
.parse()
.unwrap()
}
pub fn connection_config(&self) -> Config {
match &self.connection_config {
Some(config) => config.clone(),
None => Self::default_config(),
}
}
pub fn init(&self, create_tenant: Option<&str>) -> Result<()> {
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let mut args = vec![
"--init",
@@ -59,6 +59,12 @@ impl PageServerNode {
"--postgres-distrib",
self.env.pg_distrib_dir.to_str().unwrap(),
];
if enable_auth {
args.extend(&["--auth-validation-public-key-path", "auth_public_key.pem"]);
args.extend(&["--auth-type", "ZenithJWT"]);
}
create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid]));
let status = cmd
.args(args)
@@ -85,7 +91,7 @@ impl PageServerNode {
pub fn start(&self) -> Result<()> {
println!(
"Starting pageserver at '{}' in {}",
connection_address(&self.connection_config()),
connection_address(&self.connection_config),
self.repo_path().display()
);
@@ -105,18 +111,21 @@ impl PageServerNode {
// It takes a while for the page server to start up. Wait until it is
// open for business.
for retries in 1..15 {
let client = self.page_server_psql_client();
if client.is_ok() {
break;
} else {
println!("Pageserver not responding yet, retrying ({})...", retries);
thread::sleep(Duration::from_secs(1));
match self.page_server_psql_client() {
Ok(_) => {
println!("Pageserver started");
return Ok(());
}
Err(err) => {
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
thread::sleep(Duration::from_secs(1));
}
}
}
println!("Pageserver started");
Ok(())
bail!("pageserver failed to start");
}
pub fn stop(&self) -> Result<()> {
@@ -127,7 +136,7 @@ impl PageServerNode {
}
// wait for pageserver stop
let address = connection_address(&self.connection_config());
let address = connection_address(&self.connection_config);
for _ in 0..5 {
let stream = TcpStream::connect(&address);
thread::sleep(Duration::from_secs(1));
@@ -142,14 +151,14 @@ impl PageServerNode {
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let mut client = self.connection_config().connect(NoTls).unwrap();
let mut client = self.connection_config.connect(NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap()
}
pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
self.connection_config().connect(NoTls)
self.connection_config.connect(NoTls)
}
pub fn tenants_list(&self) -> Result<Vec<String>> {

30
docs/authentication.md Normal file
View File

@@ -0,0 +1,30 @@
## Authentication
### Overview
Current state of authentication includes usage of JWT tokens in communication between compute and pageserver and between CLI and pageserver. JWT token is signed using RSA keys. CLI generates a key pair during call to `zenith init`. Using following openssl commands:
```bash
openssl genrsa -out private_key.pem 2048
openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
```
CLI also generates signed token and saves it in the config for later access to pageserver. Now authentication is optional. Pageserver has two variables in config: `auth_validation_public_key_path` and `auth_type`, so when auth type present and set to `ZenithJWT` pageserver will require authentication for connections. Actual JWT is passed in password field of connection string. There is a caveat for psql, it silently truncates passwords to 100 symbols, so to correctly pass JWT via psql you have to either use PGPASSWORD environment variable, or store password in psql config file.
Currently there is no authentication between compute and safekeepers, because this communication layer is under heavy refactoring. After this refactoring support for authentication will be added there too. Now safekeeper supports "hardcoded" token passed via environment variable to be able to use callmemaybe command in pageserver.
Compute uses token passed via environment variable to communicate to pageserver and in the future to the safekeeper too.
JWT authentication now supports two scopes: tenant and pageserverapi. Tenant scope is intended for use in tenant related api calls, e.g. create_branch. Compute launched for particular tenant also uses this scope. Scope pageserver api is intended to be used by console to manage pageserver. For now we have only one management operation - create tenant.
Examples for token generation in python:
```python
# generate pageserverapi token
management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256")
# generate tenant token
tenant_token = jwt.encode({"scope": "tenant", "tenant_id": ps.initial_tenant}, auth_keys.priv, algorithm="RS256")
```
Utility functions to work with jwts in rust are located in zenith_utils/src/auth.rs

View File

@@ -9,11 +9,14 @@ use std::{
net::TcpListener,
path::{Path, PathBuf},
process::exit,
str::FromStr,
sync::Arc,
thread,
time::Duration,
};
use zenith_utils::{auth::JwtAuth, postgres_backend::AuthType};
use anyhow::Result;
use anyhow::{ensure, Result};
use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
@@ -36,6 +39,8 @@ struct CfgFileParams {
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
auth_validation_public_key_path: Option<String>,
auth_type: Option<String>,
}
impl CfgFileParams {
@@ -51,6 +56,8 @@ impl CfgFileParams {
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
auth_validation_public_key_path: get_arg("auth-validation-public-key-path"),
auth_type: get_arg("auth-type"),
}
}
@@ -63,11 +70,17 @@ impl CfgFileParams {
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
auth_validation_public_key_path: self
.auth_validation_public_key_path
.or(other.auth_validation_public_key_path),
auth_type: self.auth_type.or(other.auth_type),
}
}
/// Create a PageServerConf from these string parameters
fn try_into_config(&self) -> Result<PageServerConf> {
let workdir = PathBuf::from(".");
let listen_addr = match self.listen_addr.as_ref() {
Some(addr) => addr.clone(),
None => DEFAULT_LISTEN_ADDR.to_owned(),
@@ -92,10 +105,34 @@ impl CfgFileParams {
None => env::current_dir()?.join("tmp_install"),
};
let auth_validation_public_key_path = self
.auth_validation_public_key_path
.as_ref()
.map(PathBuf::from);
let auth_type = self
.auth_type
.as_ref()
.map_or(Ok(AuthType::Trust), |auth_type| {
AuthType::from_str(&auth_type)
})?;
if !pg_distrib_dir.join("bin/postgres").exists() {
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
}
if auth_type == AuthType::ZenithJWT {
ensure!(
auth_validation_public_key_path.is_some(),
"Missing auth_validation_public_key_path when auth_type is ZenithJWT"
);
let path_ref = auth_validation_public_key_path.as_ref().unwrap();
ensure!(
path_ref.exists(),
format!("Can't find auth_validation_public_key at {:?}", path_ref)
);
}
Ok(PageServerConf {
daemonize: false,
@@ -106,9 +143,13 @@ impl CfgFileParams {
superuser: String::from(DEFAULT_SUPERUSER),
workdir: PathBuf::from("."),
workdir,
pg_distrib_dir,
auth_validation_public_key_path,
auth_type,
})
}
}
@@ -168,6 +209,18 @@ fn main() -> Result<()> {
.help("Create tenant during init")
.requires("init"),
)
.arg(
Arg::with_name("auth-validation-public-key-path")
.long("auth-validation-public-key-path")
.takes_value(true)
.help("Path to public key used to validate jwt signature"),
)
.arg(
Arg::with_name("auth-type")
.long("auth-type")
.takes_value(true)
.help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"),
)
.get_matches();
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
@@ -188,6 +241,9 @@ fn main() -> Result<()> {
args_params.or(file_params)
};
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&workdir)?;
// Ensure the config is valid, even if just init-ing
let mut conf = params.try_into_config()?;
@@ -205,17 +261,15 @@ fn main() -> Result<()> {
// Create repo and exit if init was requested
if init {
branches::init_pageserver(conf, workdir, create_tenant)?;
branches::init_pageserver(conf, create_tenant)?;
// write the config file
let cfg_file_contents = toml::to_string_pretty(&params)?;
// TODO support enable-auth flag
std::fs::write(&cfg_file_path, cfg_file_contents)?;
return Ok(());
}
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&workdir)?;
start_pageserver(conf)
}
@@ -262,11 +316,23 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Initialize page cache, this will spawn walredo_thread
page_cache::init(conf);
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => Arc::new(None),
AuthType::ZenithJWT => {
// unwrap is ok because check is performed when creating config, so path is set and file exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
Arc::new(Some(JwtAuth::from_key_path(key_path)?))
}
};
info!("Using auth: {:#?}", conf.auth_type);
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
let page_service_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || page_service::thread_main(conf, pageserver_listener))?;
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
page_service_thread
.join()

View File

@@ -7,7 +7,6 @@
use anyhow::{bail, ensure, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use std::env;
use std::{
fs,
path::Path,
@@ -15,6 +14,7 @@ use std::{
str::FromStr,
sync::Arc,
};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use log::*;
use zenith_utils::lsn::Lsn;
@@ -24,8 +24,7 @@ use crate::object_repository::ObjectRepository;
use crate::page_cache;
use crate::restore_local_repo;
use crate::walredo::WalRedoManager;
use crate::ZTenantId;
use crate::{repository::Repository, PageServerConf, ZTimelineId};
use crate::{repository::Repository, PageServerConf};
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
@@ -42,13 +41,7 @@ pub struct PointInTime {
pub lsn: Lsn,
}
pub fn init_pageserver(
conf: &'static PageServerConf,
workdir: &Path,
create_tenant: Option<&str>,
) -> Result<()> {
env::set_current_dir(workdir)?;
pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str>) -> Result<()> {
// Initialize logger
let (_scope_guard, _log_file) = logger::init_logging(&conf, "pageserver.log")?;
let _log_guard = slog_stdlog::init()?;

View File

@@ -1,12 +1,10 @@
use std::fmt;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use hex::FromHex;
use lazy_static::lazy_static;
use rand::Rng;
use serde::{Deserialize, Serialize};
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
pub mod basebackup;
@@ -52,6 +50,10 @@ pub struct PageServerConf {
pub workdir: PathBuf,
pub pg_distrib_dir: PathBuf,
pub auth_type: AuthType,
pub auth_validation_public_key_path: Option<PathBuf>,
}
impl PageServerConf {
@@ -111,178 +113,3 @@ impl PageServerConf {
self.pg_distrib_dir.join("lib")
}
}
// Zenith ID is a 128-bit random ID.
// Used to represent various identifiers. Provides handy utility methods and impls.
// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct ZId([u8; 16]);
impl ZId {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId {
let mut arr = [0u8; 16];
buf.copy_to_slice(&mut arr);
ZId::from(arr)
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
pub fn generate() -> Self {
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
ZId::from(tli_buf)
}
}
impl FromStr for ZId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<ZId, Self::Err> {
Self::from_hex(s)
}
}
// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate
impl FromHex for ZId {
type Error = hex::FromHexError;
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
let mut buf: [u8; 16] = [0u8; 16];
hex::decode_to_slice(hex, &mut buf)?;
Ok(ZId(buf))
}
}
impl AsRef<[u8]> for ZId {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl From<[u8; 16]> for ZId {
fn from(b: [u8; 16]) -> Self {
ZId(b)
}
}
impl fmt::Display for ZId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}
/// Zenith timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given
/// timeline history. Those limitations mean that we cannot generate a
/// new PostgreSQL timeline ID by just generating a random number. And
/// that in turn is problematic for the "pull/push" workflow, where you
/// have a local copy of a zenith repository, and you periodically sync
/// the local changes with a remote server. When you work "detached"
/// from the remote server, you cannot create a PostgreSQL timeline ID
/// that's guaranteed to be different from all existing timelines in
/// the remote server. For example, if two people are having a clone of
/// the repository on their laptops, and they both create a new branch
/// with different name. What timeline ID would they assign to their
/// branches? If they pick the same one, and later try to push the
/// branches to the same remote server, they will get mixed up.
///
/// To avoid those issues, Zenith has its own concept of timelines that
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTimelineId(ZId);
impl FromStr for ZTimelineId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
let value = ZId::from_str(s)?;
Ok(ZTimelineId(value))
}
}
impl From<[u8; 16]> for ZTimelineId {
fn from(b: [u8; 16]) -> Self {
ZTimelineId(ZId::from(b))
}
}
impl ZTimelineId {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId {
ZTimelineId(ZId::get_from_buf(buf))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}
pub fn generate() -> Self {
ZTimelineId(ZId::generate())
}
}
impl fmt::Display for ZTimelineId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
// Zenith Tenant Id represents identifiar of a particular tenant.
// Is used for distinguishing requests and data belonging to different users.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ZTenantId(ZId);
impl FromStr for ZTenantId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<ZTenantId, Self::Err> {
let value = ZId::from_str(s)?;
Ok(ZTenantId(value))
}
}
impl From<[u8; 16]> for ZTenantId {
fn from(b: [u8; 16]) -> Self {
ZTenantId(ZId::from(b))
}
}
impl FromHex for ZTenantId {
type Error = hex::FromHexError;
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
Ok(ZTenantId(ZId::from_hex(hex)?))
}
}
impl AsRef<[u8]> for ZTenantId {
fn as_ref(&self) -> &[u8] {
&self.0 .0
}
}
impl ZTenantId {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTenantId {
ZTenantId(ZId::get_from_buf(buf))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}
pub fn generate() -> Self {
ZTenantId(ZId::generate())
}
}
impl fmt::Display for ZTenantId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

View File

@@ -3,8 +3,8 @@
//!
use crate::relish::RelishTag;
use crate::ZTimelineId;
use serde::{Deserialize, Serialize};
use zenith_utils::zid::ZTimelineId;
///
/// ObjectKey is the key type used to identify objects stored in an object

View File

@@ -13,13 +13,13 @@
//! until we find the page we're looking for, making a separate lookup into the
//! key-value store for each timeline.
use crate::object_key::*;
use crate::object_store::ObjectStore;
use crate::relish::*;
use crate::repository::*;
use crate::restore_local_repo::import_timeline_wal;
use crate::walredo::WalRedoManager;
use crate::{object_key::*, ZTenantId};
use crate::{PageServerConf, ZTimelineId};
use crate::PageServerConf;
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use log::*;
@@ -33,6 +33,8 @@ use std::time::{Duration, Instant};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
@@ -273,11 +275,13 @@ impl Timeline for ObjectTimeline {
// Handle truncated SLRU segments.
// XXX if this will turn out to be performance critical,
// move this check out of the funciton.
if let RelishTag::Slru{slru: _slru, segno: _segno} = rel
if let RelishTag::Slru {
slru: _slru,
segno: _segno,
} = rel
{
info!("test SLRU rel {:?} at {}", rel, req_lsn);
if !self.get_rel_exists(rel, req_lsn).unwrap_or(false)
{
if !self.get_rel_exists(rel, req_lsn).unwrap_or(false) {
info!("SLRU rel {:?} at {} doesn't exist", rel, req_lsn);
return Err(anyhow!("SLRU rel doesn't exist"));
}
@@ -350,9 +354,7 @@ impl Timeline for ObjectTimeline {
/// Does relation exist at given LSN?
fn get_rel_exists(&self, rel: RelishTag, req_lsn: Lsn) -> Result<bool> {
if let Some(_) = self.get_relish_size(rel, req_lsn)?
{
if let Some(_) = self.get_relish_size(rel, req_lsn)? {
trace!("Relation {} exists at {}", rel, req_lsn);
return Ok(true);
}
@@ -714,9 +716,11 @@ impl Timeline for ObjectTimeline {
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
prepared_horizon = Lsn::min(lsn, prepared_horizon);
if !self.get_tx_is_in_progress(xid, horizon)
{
info!("unlink twophase_file NOT TRANSACTION_STATUS_IN_PROGRESS {}", xid);
if !self.get_tx_is_in_progress(xid, horizon) {
info!(
"unlink twophase_file NOT TRANSACTION_STATUS_IN_PROGRESS {}",
xid
);
self.obj_store.unlink(&key, lsn)?;
result.prep_deleted += 1;
}

View File

@@ -2,11 +2,11 @@
//!
use crate::object_key::*;
use crate::relish::*;
use crate::ZTimelineId;
use anyhow::Result;
use std::collections::HashSet;
use std::iter::Iterator;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTimelineId;
///
/// Low-level storage abstraction.

View File

@@ -6,7 +6,7 @@ use crate::object_repository::ObjectRepository;
use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::PostgresRedoManager;
use crate::{PageServerConf, ZTenantId};
use crate::PageServerConf;
use anyhow::{anyhow, bail, Result};
use lazy_static::lazy_static;
use log::info;
@@ -14,6 +14,7 @@ use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use zenith_utils::zid::ZTenantId;
lazy_static! {
pub static ref REPOSITORY: Mutex<HashMap<ZTenantId, Arc<dyn Repository>>> =

View File

@@ -10,22 +10,27 @@
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
//
use anyhow::{anyhow, bail, ensure};
use anyhow::{anyhow, bail, ensure, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use regex::Regex;
use std::io::Write;
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::{io, net::TcpStream};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::JwtAuth;
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::postgres_backend::{self, AuthType};
use zenith_utils::pq_proto::{
BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC,
};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup;
@@ -35,8 +40,6 @@ use crate::relish::*;
use crate::repository::Modification;
use crate::walreceiver;
use crate::PageServerConf;
use crate::ZTenantId;
use crate::ZTimelineId;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
@@ -140,22 +143,32 @@ impl PagestreamBeMessage {
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(conf: &'static PageServerConf, listener: TcpListener) -> anyhow::Result<()> {
pub fn thread_main(
conf: &'static PageServerConf,
auth: Arc<Option<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
) -> anyhow::Result<()> {
loop {
let (socket, peer_addr) = listener.accept()?;
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let local_auth = Arc::clone(&auth);
thread::spawn(move || {
if let Err(err) = page_service_conn_main(conf, socket) {
if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) {
error!("error: {}", err);
}
});
}
}
fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> anyhow::Result<()> {
// Immediately increment the gauge, then create a job to decrement it on thread exit.
fn page_service_conn_main(
conf: &'static PageServerConf,
auth: Arc<Option<JwtAuth>>,
socket: TcpStream,
auth_type: AuthType,
) -> anyhow::Result<()> {
// Immediatsely increment the gauge, then create a job to decrement it on thread exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
@@ -164,14 +177,16 @@ fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> a
gauge.dec();
}
let mut conn_handler = PageServerHandler::new(conf);
let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?;
let mut conn_handler = PageServerHandler::new(conf, auth);
let pgbackend = PostgresBackend::new(socket, auth_type)?;
pgbackend.run(&mut conn_handler)
}
#[derive(Debug)]
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Arc<Option<JwtAuth>>,
claims: Option<Claims>,
}
const TIME_BUCKETS: &[f64] = &[
@@ -193,8 +208,12 @@ lazy_static! {
}
impl PageServerHandler {
pub fn new(conf: &'static PageServerConf) -> Self {
PageServerHandler { conf }
pub fn new(conf: &'static PageServerConf, auth: Arc<Option<JwtAuth>>) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
}
}
fn handle_controlfile(&self, pgb: &mut PostgresBackend) -> io::Result<()> {
@@ -355,9 +374,68 @@ impl PageServerHandler {
Ok(())
}
// when accessing management api supply None as an argument
// when using to authorize tenant pass corresponding tenant id
fn check_permission(&self, tenantid: Option<ZTenantId>) -> Result<()> {
if self.auth.is_none() {
// auth is set to Trust, nothing to check so just return ok
return Ok(());
}
// auth is some, just checked above, when auth is some
// then claims are always present because of checks during connetion init
// so this expect won't trigger
let claims = self
.claims
.as_ref()
.expect("claims presence already checked");
match (&claims.scope, tenantid) {
(Scope::Tenant, None) => {
bail!("Attempt to access management api with tenant scope. Permission denied")
}
(Scope::Tenant, Some(tenantid)) => {
if claims.tenant_id.unwrap() != tenantid {
bail!("Tenant id mismatch. Permission denied")
}
Ok(())
}
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
}
}
}
impl postgres_backend::Handler for PageServerHandler {
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackend,
jwt_response: &[u8],
) -> anyhow::Result<()> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is ZenithJWT
// which requires auth to be present
let data = self
.auth
.as_ref()
.as_ref()
.unwrap()
.decode(&str::from_utf8(jwt_response)?)?;
if matches!(data.claims.scope, Scope::Tenant) {
ensure!(
data.claims.tenant_id.is_some(),
"jwt token scope is Tenant, but tenant id is missing"
)
}
info!(
"jwt auth succeeded for scope: {:#?} by tenantid: {:?}",
data.claims.scope, data.claims.tenant_id,
);
self.claims = Some(data.claims);
Ok(())
}
fn process_query(
&mut self,
pgb: &mut PostgresBackend,
@@ -384,6 +462,8 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(params[0])?;
let timelineid = ZTimelineId::from_str(params[1])?;
self.check_permission(Some(tenantid))?;
self.handle_pagerequests(pgb, timelineid, tenantid)?;
} else if query_string.starts_with("basebackup ") {
let (_, params_raw) = query_string.split_at("basebackup ".len());
@@ -396,6 +476,8 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(params[0])?;
let timelineid = ZTimelineId::from_str(params[1])?;
self.check_permission(Some(tenantid))?;
// TODO are there any tests with lsn option?
let lsn = if params.len() == 3 {
Some(Lsn::from_str(params[2])?)
@@ -422,6 +504,8 @@ impl postgres_backend::Handler for PageServerHandler {
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let connstr = caps.get(3).unwrap().as_str().to_owned();
self.check_permission(Some(tenantid))?;
// Check that the timeline exists
let repository = page_cache::get_repository_for_tenant(&tenantid)?;
if repository.get_timeline(timelineid).is_err() {
@@ -445,6 +529,8 @@ impl postgres_backend::Handler for PageServerHandler {
let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned();
let startpoint_str = caps.get(3).ok_or_else(err)?.as_str().to_owned();
self.check_permission(Some(tenantid))?;
let branch =
branches::create_branch(&self.conf, &branchname, &startpoint_str, &tenantid)?;
let branch = serde_json::to_vec(&branch)?;
@@ -463,6 +549,8 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
self.check_permission(Some(tenantid))?;
let start_lsn = Lsn(0); // TODO this needs to come from the repo
let timeline = page_cache::get_repository_for_tenant(&tenantid)?
.create_empty_timeline(timelineid, start_lsn)?;
@@ -505,6 +593,8 @@ impl postgres_backend::Handler for PageServerHandler {
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let postgres_connection_uri = caps.get(3).unwrap().as_str();
self.check_permission(Some(tenantid))?;
let timeline =
page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?;
@@ -551,6 +641,8 @@ impl postgres_backend::Handler for PageServerHandler {
let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap();
let caps = re.captures(&query_string).ok_or_else(err)?;
self.check_permission(None)?;
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
page_cache::create_repository_for_tenant(&self.conf, tenantid)?;

View File

@@ -1,6 +1,5 @@
use crate::object_key::*;
use crate::relish::*;
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
@@ -12,6 +11,7 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTimelineId;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
@@ -258,13 +258,15 @@ mod tests {
use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry};
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::{PageServerConf, ZTenantId};
use crate::PageServerConf;
use postgres_ffi::pg_constants;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelishTag = RelishTag::Relation(RelTag {
@@ -304,6 +306,8 @@ mod tests {
superuser: "zenith_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
};
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.

View File

@@ -5,14 +5,14 @@ use crate::object_key::*;
use crate::object_store::ObjectStore;
use crate::relish::*;
use crate::PageServerConf;
use crate::ZTenantId;
use crate::ZTimelineId;
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StorageKey {

View File

@@ -9,8 +9,6 @@ use crate::relish::*;
use crate::restore_local_repo;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTenantId;
use crate::ZTimelineId;
use anyhow::{Error, Result};
use lazy_static::lazy_static;
use log::*;
@@ -32,6 +30,8 @@ use std::thread;
use std::thread::sleep;
use std::time::{Duration, SystemTime};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
//
// We keep one WAL Receiver active per timeline.

View File

@@ -36,13 +36,13 @@ use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::waldecoder::XlXactParsedRecord;
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use crate::ZTenantId;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;

View File

@@ -7,6 +7,7 @@ name = "pypi"
pytest = ">=6.0.0"
psycopg2 = "*"
typing-extensions = "*"
pyjwt = {extras = ["crypto"], version = "*"}
[dev-packages]
yapf = "*"

226
test_runner/Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "4c20c05c20c50cf7e8f78ab461ab23841125345e63e00e2efa7661c165b6b364"
"sha256": "f60a966726bcc19670402ad3fa57396b5dacf0a027544418ceb7cc0d42d94a52"
},
"pipfile-spec": 6,
"requires": {
@@ -24,13 +24,72 @@
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==21.2.0"
},
"importlib-metadata": {
"cffi": {
"hashes": [
"sha256:833b26fb89d5de469b24a390e9df088d4e52e4ba33b01dc5e0e4f41b81a16c00",
"sha256:b142cc1dd1342f31ff04bb7d022492b09920cb64fed867cd3ea6f80fe3ebd139"
"sha256:06c54a68935738d206570b20da5ef2b6b6d92b38ef3ec45c5422c0ebaf338d4d",
"sha256:0c0591bee64e438883b0c92a7bed78f6290d40bf02e54c5bf0978eaf36061771",
"sha256:19ca0dbdeda3b2615421d54bef8985f72af6e0c47082a8d26122adac81a95872",
"sha256:22b9c3c320171c108e903d61a3723b51e37aaa8c81255b5e7ce102775bd01e2c",
"sha256:26bb2549b72708c833f5abe62b756176022a7b9a7f689b571e74c8478ead51dc",
"sha256:33791e8a2dc2953f28b8d8d300dde42dd929ac28f974c4b4c6272cb2955cb762",
"sha256:3c8d896becff2fa653dc4438b54a5a25a971d1f4110b32bd3068db3722c80202",
"sha256:4373612d59c404baeb7cbd788a18b2b2a8331abcc84c3ba40051fcd18b17a4d5",
"sha256:487d63e1454627c8e47dd230025780e91869cfba4c753a74fda196a1f6ad6548",
"sha256:48916e459c54c4a70e52745639f1db524542140433599e13911b2f329834276a",
"sha256:4922cd707b25e623b902c86188aca466d3620892db76c0bdd7b99a3d5e61d35f",
"sha256:55af55e32ae468e9946f741a5d51f9896da6b9bf0bbdd326843fec05c730eb20",
"sha256:57e555a9feb4a8460415f1aac331a2dc833b1115284f7ded7278b54afc5bd218",
"sha256:5d4b68e216fc65e9fe4f524c177b54964af043dde734807586cf5435af84045c",
"sha256:64fda793737bc4037521d4899be780534b9aea552eb673b9833b01f945904c2e",
"sha256:6d6169cb3c6c2ad50db5b868db6491a790300ade1ed5d1da29289d73bbe40b56",
"sha256:7bcac9a2b4fdbed2c16fa5681356d7121ecabf041f18d97ed5b8e0dd38a80224",
"sha256:80b06212075346b5546b0417b9f2bf467fea3bfe7352f781ffc05a8ab24ba14a",
"sha256:818014c754cd3dba7229c0f5884396264d51ffb87ec86e927ef0be140bfdb0d2",
"sha256:8eb687582ed7cd8c4bdbff3df6c0da443eb89c3c72e6e5dcdd9c81729712791a",
"sha256:99f27fefe34c37ba9875f224a8f36e31d744d8083e00f520f133cab79ad5e819",
"sha256:9f3e33c28cd39d1b655ed1ba7247133b6f7fc16fa16887b120c0c670e35ce346",
"sha256:a8661b2ce9694ca01c529bfa204dbb144b275a31685a075ce123f12331be790b",
"sha256:a9da7010cec5a12193d1af9872a00888f396aba3dc79186604a09ea3ee7c029e",
"sha256:aedb15f0a5a5949ecb129a82b72b19df97bbbca024081ed2ef88bd5c0a610534",
"sha256:b315d709717a99f4b27b59b021e6207c64620790ca3e0bde636a6c7f14618abb",
"sha256:ba6f2b3f452e150945d58f4badd92310449876c4c954836cfb1803bdd7b422f0",
"sha256:c33d18eb6e6bc36f09d793c0dc58b0211fccc6ae5149b808da4a62660678b156",
"sha256:c9a875ce9d7fe32887784274dd533c57909b7b1dcadcc128a2ac21331a9765dd",
"sha256:c9e005e9bd57bc987764c32a1bee4364c44fdc11a3cc20a40b93b444984f2b87",
"sha256:d2ad4d668a5c0645d281dcd17aff2be3212bc109b33814bbb15c4939f44181cc",
"sha256:d950695ae4381ecd856bcaf2b1e866720e4ab9a1498cba61c602e56630ca7195",
"sha256:e22dcb48709fc51a7b58a927391b23ab37eb3737a98ac4338e2448bef8559b33",
"sha256:e8c6a99be100371dbb046880e7a282152aa5d6127ae01783e37662ef73850d8f",
"sha256:e9dc245e3ac69c92ee4c167fbdd7428ec1956d4e754223124991ef29eb57a09d",
"sha256:eb687a11f0a7a1839719edd80f41e459cc5366857ecbed383ff376c4e3cc6afd",
"sha256:eb9e2a346c5238a30a746893f23a9535e700f8192a68c07c0258e7ece6ff3728",
"sha256:ed38b924ce794e505647f7c331b22a693bee1538fdf46b0222c4717b42f744e7",
"sha256:f0010c6f9d1a4011e429109fda55a225921e3206e7f62a0c22a35344bfd13cca",
"sha256:f0c5d1acbfca6ebdd6b1e3eded8d261affb6ddcf2186205518f1428b8569bb99",
"sha256:f10afb1004f102c7868ebfe91c28f4a712227fe4cb24974350ace1f90e1febbf",
"sha256:f174135f5609428cc6e1b9090f9268f5c8935fddb1b25ccb8255a2d50de6789e",
"sha256:f3ebe6e73c319340830a9b2825d32eb6d8475c1dac020b4f0aa774ee3b898d1c",
"sha256:f627688813d0a4140153ff532537fbe4afea5a3dffce1f9deb7f91f848a832b5",
"sha256:fd4305f86f53dfd8cd3522269ed7fc34856a8ee3709a5e28b2836b2db9d4cd69"
],
"markers": "python_version < '3.8'",
"version": "==4.5.0"
"version": "==1.14.6"
},
"cryptography": {
"hashes": [
"sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d",
"sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959",
"sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6",
"sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873",
"sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2",
"sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713",
"sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1",
"sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177",
"sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250",
"sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca",
"sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d",
"sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9"
],
"version": "==3.4.7"
},
"iniconfig": {
"hashes": [
@@ -41,11 +100,11 @@
},
"packaging": {
"hashes": [
"sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5",
"sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a"
"sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7",
"sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==20.9"
"markers": "python_version >= '3.6'",
"version": "==21.0"
},
"pluggy": {
"hashes": [
@@ -57,18 +116,18 @@
},
"psycopg2": {
"hashes": [
"sha256:03a485bf71498870e38b535c0e6e7162d6ac06a91487edddc3b959894d65f79c",
"sha256:22102cfeb904898254f287b1a77360bf66c636858e7476593acd5267e5c24ff9",
"sha256:8f4c1800e57ad128d20b2e91d222ca238fffd316cef65be781361cdf35e37979",
"sha256:b12073fdf2002e828e5921be2c39ff9c6eab361c5c0bd6c529619fc23677accc",
"sha256:b6f47af317af8110818d255e693cfa80b7f1e435285be09778db7b66efd95789",
"sha256:d549db98fc0e6db41a2aa0d65f7434c4308a9f64012adb209b9e489f26fe87c6",
"sha256:e44e39a46af7c30566b7667fb27e701e652ab0a51e05c263a01d3ff0e223b765",
"sha256:e84c80be7a238d3c9c099b71f6890eaa35fc881146232cce888a88ab1bfb431e",
"sha256:f3d42bd42302293767b84206d9a446abc67ed4a133e4fe04dad8952de06c2091"
"sha256:079d97fc22de90da1d370c90583659a9f9a6ee4007355f5825e5f1c70dffc1fa",
"sha256:2087013c159a73e09713294a44d0c8008204d06326006b7f652bef5ace66eebb",
"sha256:2c992196719fadda59f72d44603ee1a2fdcc67de097eea38d41c7ad9ad246e62",
"sha256:7640e1e4d72444ef012e275e7b53204d7fab341fb22bc76057ede22fe6860b25",
"sha256:7f91312f065df517187134cce8e395ab37f5b601a42446bdc0f0d51773621854",
"sha256:830c8e8dddab6b6716a4bf73a09910c7954a92f40cf1d1e702fb93c8a919cc56",
"sha256:89409d369f4882c47f7ea20c42c5046879ce22c1e4ea20ef3b00a4dfc0a7f188",
"sha256:bf35a25f1aaa8a3781195595577fcbb59934856ee46b4f252f56ad12b8043bcf",
"sha256:de5303a6f1d0a7a34b9d40e4d3bef684ccc44a49bbe3eb85e3c0bffb4a131b7c"
],
"index": "pypi",
"version": "==2.9"
"version": "==2.9.1"
},
"py": {
"hashes": [
@@ -78,6 +137,25 @@
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.10.0"
},
"pycparser": {
"hashes": [
"sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0",
"sha256:7582ad22678f0fcd81102833f60ef8d0e57288b6b5fb00323d101be910e35705"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==2.20"
},
"pyjwt": {
"extras": [
"crypto"
],
"hashes": [
"sha256:934d73fbba91b0483d3857d1aff50e96b2a892384ee2c17417ed3203f173fca1",
"sha256:fba44e7898bbca160a2b2b501f492824fc8382485d3a6f11ba5d0c1937ce6130"
],
"index": "pypi",
"version": "==2.1.0"
},
"pyparsing": {
"hashes": [
"sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1",
@@ -110,14 +188,6 @@
],
"index": "pypi",
"version": "==3.10.0.0"
},
"zipp": {
"hashes": [
"sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76",
"sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098"
],
"markers": "python_version >= '3.6'",
"version": "==3.4.1"
}
},
"develop": {
@@ -129,14 +199,6 @@
"index": "pypi",
"version": "==3.9.2"
},
"importlib-metadata": {
"hashes": [
"sha256:833b26fb89d5de469b24a390e9df088d4e52e4ba33b01dc5e0e4f41b81a16c00",
"sha256:b142cc1dd1342f31ff04bb7d022492b09920cb64fed867cd3ea6f80fe3ebd139"
],
"markers": "python_version < '3.8'",
"version": "==4.5.0"
},
"mccabe": {
"hashes": [
"sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42",
@@ -146,32 +208,32 @@
},
"mypy": {
"hashes": [
"sha256:0190fb77e93ce971954c9e54ea61de2802065174e5e990c9d4c1d0f54fbeeca2",
"sha256:0756529da2dd4d53d26096b7969ce0a47997123261a5432b48cc6848a2cb0bd4",
"sha256:2f9fedc1f186697fda191e634ac1d02f03d4c260212ccb018fabbb6d4b03eee8",
"sha256:353aac2ce41ddeaf7599f1c73fed2b75750bef3b44b6ad12985a991bc002a0da",
"sha256:3f12705eabdd274b98f676e3e5a89f247ea86dc1af48a2d5a2b080abac4e1243",
"sha256:4efc67b9b3e2fddbe395700f91d5b8deb5980bfaaccb77b306310bd0b9e002eb",
"sha256:517e7528d1be7e187a5db7f0a3e479747307c1b897d9706b1c662014faba3116",
"sha256:68a098c104ae2b75e946b107ef69dd8398d54cb52ad57580dfb9fc78f7f997f0",
"sha256:746e0b0101b8efec34902810047f26a8c80e1efbb4fc554956d848c05ef85d76",
"sha256:8be7bbd091886bde9fcafed8dd089a766fa76eb223135fe5c9e9798f78023a20",
"sha256:9236c21194fde5df1b4d8ebc2ef2c1f2a5dc7f18bcbea54274937cae2e20a01c",
"sha256:9ef5355eaaf7a23ab157c21a44c614365238a7bdb3552ec3b80c393697d974e1",
"sha256:9f1d74eeb3f58c7bd3f3f92b8f63cb1678466a55e2c4612bf36909105d0724ab",
"sha256:a26d0e53e90815c765f91966442775cf03b8a7514a4e960de7b5320208b07269",
"sha256:ae94c31bb556ddb2310e4f913b706696ccbd43c62d3331cd3511caef466871d2",
"sha256:b5ba1f0d5f9087e03bf5958c28d421a03a4c1ad260bf81556195dffeccd979c4",
"sha256:b5dfcd22c6bab08dfeded8d5b44bdcb68c6f1ab261861e35c470b89074f78a70",
"sha256:cd01c599cf9f897b6b6c6b5d8b182557fb7d99326bcdf5d449a0fbbb4ccee4b9",
"sha256:e89880168c67cf4fde4506b80ee42f1537ad66ad366c101d388b3fd7d7ce2afd",
"sha256:ebe2bc9cb638475f5d39068d2dbe8ae1d605bb8d8d3ff281c695df1670ab3987",
"sha256:f89bfda7f0f66b789792ab64ce0978e4a991a0e4dd6197349d0767b0f1095b21",
"sha256:fc4d63da57ef0e8cd4ab45131f3fe5c286ce7dd7f032650d0fbc239c6190e167",
"sha256:fd634bc17b1e2d6ce716f0e43446d0d61cdadb1efcad5c56ca211c22b246ebc8"
"sha256:088cd9c7904b4ad80bec811053272986611b84221835e079be5bcad029e79dd9",
"sha256:0aadfb2d3935988ec3815952e44058a3100499f5be5b28c34ac9d79f002a4a9a",
"sha256:119bed3832d961f3a880787bf621634ba042cb8dc850a7429f643508eeac97b9",
"sha256:1a85e280d4d217150ce8cb1a6dddffd14e753a4e0c3cf90baabb32cefa41b59e",
"sha256:3c4b8ca36877fc75339253721f69603a9c7fdb5d4d5a95a1a1b899d8b86a4de2",
"sha256:3e382b29f8e0ccf19a2df2b29a167591245df90c0b5a2542249873b5c1d78212",
"sha256:42c266ced41b65ed40a282c575705325fa7991af370036d3f134518336636f5b",
"sha256:53fd2eb27a8ee2892614370896956af2ff61254c275aaee4c230ae771cadd885",
"sha256:704098302473cb31a218f1775a873b376b30b4c18229421e9e9dc8916fd16150",
"sha256:7df1ead20c81371ccd6091fa3e2878559b5c4d4caadaf1a484cf88d93ca06703",
"sha256:866c41f28cee548475f146aa4d39a51cf3b6a84246969f3759cb3e9c742fc072",
"sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457",
"sha256:adaeee09bfde366d2c13fe6093a7df5df83c9a2ba98638c7d76b010694db760e",
"sha256:b6fb13123aeef4a3abbcfd7e71773ff3ff1526a7d3dc538f3929a49b42be03f0",
"sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb",
"sha256:c0df2d30ed496a08de5daed2a9ea807d07c21ae0ab23acf541ab88c24b26ab97",
"sha256:c6c2602dffb74867498f86e6129fd52a2770c48b7cd3ece77ada4fa38f94eba8",
"sha256:ceb6e0a6e27fb364fb3853389607cf7eb3a126ad335790fa1e14ed02fba50811",
"sha256:d9dd839eb0dc1bbe866a288ba3c1afc33a202015d2ad83b31e875b5905a079b6",
"sha256:e4dab234478e3bd3ce83bac4193b2ecd9cf94e720ddd95ce69840273bf44f6de",
"sha256:ec4e0cd079db280b6bdabdc807047ff3e199f334050db5cbb91ba3e959a67504",
"sha256:ecd2c3fe726758037234c93df7e98deb257fd15c24c9180dacf1ef829da5f921",
"sha256:ef565033fa5a958e62796867b1df10c40263ea9ded87164d67572834e57a174d"
],
"index": "pypi",
"version": "==0.902"
"version": "==0.910"
},
"mypy-extensions": {
"hashes": [
@@ -204,42 +266,6 @@
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.10.2"
},
"typed-ast": {
"hashes": [
"sha256:01ae5f73431d21eead5015997ab41afa53aa1fbe252f9da060be5dad2c730ace",
"sha256:067a74454df670dcaa4e59349a2e5c81e567d8d65458d480a5b3dfecec08c5ff",
"sha256:0fb71b8c643187d7492c1f8352f2c15b4c4af3f6338f21681d3681b3dc31a266",
"sha256:1b3ead4a96c9101bef08f9f7d1217c096f31667617b58de957f690c92378b528",
"sha256:2068531575a125b87a41802130fa7e29f26c09a2833fea68d9a40cf33902eba6",
"sha256:209596a4ec71d990d71d5e0d312ac935d86930e6eecff6ccc7007fe54d703808",
"sha256:2c726c276d09fc5c414693a2de063f521052d9ea7c240ce553316f70656c84d4",
"sha256:398e44cd480f4d2b7ee8d98385ca104e35c81525dd98c519acff1b79bdaac363",
"sha256:52b1eb8c83f178ab787f3a4283f68258525f8d70f778a2f6dd54d3b5e5fb4341",
"sha256:5feca99c17af94057417d744607b82dd0a664fd5e4ca98061480fd8b14b18d04",
"sha256:7538e495704e2ccda9b234b82423a4038f324f3a10c43bc088a1636180f11a41",
"sha256:760ad187b1041a154f0e4d0f6aae3e40fdb51d6de16e5c99aedadd9246450e9e",
"sha256:777a26c84bea6cd934422ac2e3b78863a37017618b6e5c08f92ef69853e765d3",
"sha256:95431a26309a21874005845c21118c83991c63ea800dd44843e42a916aec5899",
"sha256:9ad2c92ec681e02baf81fdfa056fe0d818645efa9af1f1cd5fd6f1bd2bdfd805",
"sha256:9c6d1a54552b5330bc657b7ef0eae25d00ba7ffe85d9ea8ae6540d2197a3788c",
"sha256:aee0c1256be6c07bd3e1263ff920c325b59849dc95392a05f258bb9b259cf39c",
"sha256:af3d4a73793725138d6b334d9d247ce7e5f084d96284ed23f22ee626a7b88e39",
"sha256:b36b4f3920103a25e1d5d024d155c504080959582b928e91cb608a65c3a49e1a",
"sha256:b9574c6f03f685070d859e75c7f9eeca02d6933273b5e69572e5ff9d5e3931c3",
"sha256:bff6ad71c81b3bba8fa35f0f1921fb24ff4476235a6e94a26ada2e54370e6da7",
"sha256:c190f0899e9f9f8b6b7863debfb739abcb21a5c054f911ca3596d12b8a4c4c7f",
"sha256:c907f561b1e83e93fad565bac5ba9c22d96a54e7ea0267c708bffe863cbe4075",
"sha256:cae53c389825d3b46fb37538441f75d6aecc4174f615d048321b716df2757fb0",
"sha256:dd4a21253f42b8d2b48410cb31fe501d32f8b9fbeb1f55063ad102fe9c425e40",
"sha256:dde816ca9dac1d9c01dd504ea5967821606f02e510438120091b84e852367428",
"sha256:f2362f3cb0f3172c42938946dbc5b7843c2a28aec307c49100c8b38764eb6927",
"sha256:f328adcfebed9f11301eaedfa48e15bdece9b519fb27e6a8c01aa52a17ec31b3",
"sha256:f8afcf15cc511ada719a88e013cec87c11aff7b91f019295eb4530f96fe5ef2f",
"sha256:fb1bbeac803adea29cedd70781399c99138358c26d05fcbd23c13016b7f5ec65"
],
"markers": "python_version < '3.8'",
"version": "==1.4.3"
},
"typing-extensions": {
"hashes": [
"sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497",
@@ -256,14 +282,6 @@
],
"index": "pypi",
"version": "==0.31.0"
},
"zipp": {
"hashes": [
"sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76",
"sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098"
],
"markers": "python_version >= '3.6'",
"version": "==3.4.1"
}
}
}

View File

@@ -0,0 +1,98 @@
from contextlib import closing
from pathlib import Path
from uuid import uuid4
from dataclasses import dataclass
import jwt
import psycopg2
from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver
import pytest
@pytest.fixture
def pageserver_auth_enabled(zenith_cli: ZenithCli):
with ZenithPageserver(zenith_cli).init(enable_auth=True).start() as ps:
# For convenience in tests, create a branch from the freshly-initialized cluster.
zenith_cli.run(["branch", "empty", "main"])
yield ps
@dataclass
class AuthKeys:
pub: bytes
priv: bytes
@pytest.fixture
def auth_keys(repo_dir: str) -> AuthKeys:
# TODO probably this should be specified in cli config and used in tests for single source of truth
pub = (Path(repo_dir) / 'auth_public_key.pem').read_bytes()
priv = (Path(repo_dir) / 'auth_private_key.pem').read_bytes()
return AuthKeys(pub=pub, priv=priv)
def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver, auth_keys: AuthKeys):
ps = pageserver_auth_enabled
tenant_token = jwt.encode({"scope": "tenant", "tenant_id": ps.initial_tenant}, auth_keys.priv, algorithm="RS256")
invalid_tenant_token = jwt.encode({"scope": "tenant", "tenant_id": uuid4().hex}, auth_keys.priv, algorithm="RS256")
management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256")
# this does not invoke auth check and only decodes jwt and checks it for validity
# check both tokens
ps.safe_psql("status", password=tenant_token)
ps.safe_psql("status", password=management_token)
# tenant can create branches
ps.safe_psql(f"branch_create {ps.initial_tenant} new1 main", password=tenant_token)
# console can create branches for tenant
ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=management_token)
# fail to create branch using token with different tenantid
with pytest.raises(psycopg2.DatabaseError, match='Tenant id mismatch. Permission denied'):
ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=invalid_tenant_token)
# create tenant using management token
ps.safe_psql(f"tenant_create {uuid4().hex}", password=management_token)
# fail to create tenant using tenant token
with pytest.raises(psycopg2.DatabaseError, match='Attempt to access management api with tenant scope. Permission denied'):
ps.safe_psql(f"tenant_create {uuid4().hex}", password=tenant_token)
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_compute_auth_to_pageserver(
zenith_cli: ZenithCli,
wa_factory,
pageserver_auth_enabled: ZenithPageserver,
repo_dir: str,
with_wal_acceptors: bool,
auth_keys: AuthKeys,
):
ps = pageserver_auth_enabled
# since we are in progress of refactoring protocols between compute safekeeper and page server
# use hardcoded management token in safekeeper
management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256")
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
zenith_cli.run(["branch", branch, "empty"])
if with_wal_acceptors:
wa_factory.start_n_new(3, management_token)
with Postgres(
zenith_cli=zenith_cli,
repo_dir=repo_dir,
tenant_id=ps.initial_tenant,
port=55432, # FIXME port distribution is hardcoded in tests and in cli
).create_start(
branch,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
) as pg:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (5000050000, )

View File

@@ -87,23 +87,30 @@ class PgProtocol:
self.port = port
self.username = username or "zenith_admin"
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str:
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, password: Optional[str] = None) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
username = username or self.username
return f'host={self.host} port={self.port} user={username} dbname={dbname}'
res = f'host={self.host} port={self.port} user={username} dbname={dbname}'
if not password:
return res
return f'{res} password={password}'
# autocommit=True here by default because that's what we need most of the time
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
def connect(self, *, autocommit=True, dbname: str = 'postgres', username: Optional[str] = None, password: Optional[str] = None) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(self.connstr(**kwargs))
conn = psycopg2.connect(self.connstr(
dbname=dbname,
username=username,
password=password,
))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
@@ -175,12 +182,15 @@ class ZenithPageserver(PgProtocol):
self.running = False
self.initial_tenant = None
def init(self) -> 'ZenithPageserver':
def init(self, enable_auth: bool = False) -> 'ZenithPageserver':
"""
Initialize the repository, i.e. run "zenith init".
Returns self.
"""
self.zenith_cli.run(['init'])
cmd = ['init']
if enable_auth:
cmd.append('--enable-auth')
self.zenith_cli.run(cmd)
return self
def start(self) -> 'ZenithPageserver':
@@ -207,6 +217,12 @@ class ZenithPageserver(PgProtocol):
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.stop()
@zenfixture
def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
@@ -235,11 +251,10 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int, tenant_id: str):
super().__init__(host='localhost', port=55431 + instance_num)
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli
self.instance_num = instance_num
self.running = False
self.repo_dir = repo_dir
self.branch: Optional[str] = None # dubious, see asserts below
@@ -373,15 +388,22 @@ class Postgres(PgProtocol):
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.stop()
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.base_port = base_port
def create_start(
self,
@@ -391,7 +413,13 @@ class PostgresFactory:
config_lines: Optional[List[str]] = None
) -> Postgres:
pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1, tenant_id=tenant_id or self.initial_tenant)
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
)
self.num_instances += 1
self.instances.append(pg)
@@ -490,11 +518,12 @@ def read_pid(path):
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
def __init__(self, wa_binpath, data_dir, port, num):
def __init__(self, wa_binpath, data_dir, port, num, auth_token: Optional[str] = None):
self.wa_binpath = wa_binpath
self.data_dir = data_dir
self.port = port
self.num = num # identifier for logging
self.auth_token = auth_token
def start(self) -> 'WalAcceptor':
# create data directory if not exists
@@ -509,7 +538,8 @@ class WalAcceptor:
cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PORT)])
cmd.extend(["--recall", "1 second"])
print('Running command "{}"'.format(' '.join(cmd)))
subprocess.run(cmd, check=True)
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
subprocess.run(cmd, check=True, env=env)
return self
@@ -545,26 +575,30 @@ class WalAcceptorFactory:
self.instances = []
self.initial_port = 54321
def start_new(self) -> WalAcceptor:
def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
"""
Start new wal acceptor.
"""
wa_num = len(self.instances)
wa = WalAcceptor(self.wa_binpath,
os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)),
self.initial_port + wa_num, wa_num)
wa = WalAcceptor(
self.wa_binpath,
os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)),
self.initial_port + wa_num,
wa_num,
auth_token,
)
wa.start()
self.instances.append(wa)
return wa
def start_n_new(self, n: int) -> None:
def start_n_new(self, n: int, auth_token: Optional[str] = None) -> None:
"""
Start n new wal acceptors.
"""
for _ in range(n):
self.start_new()
self.start_new(auth_token)
def stop_all(self) -> 'WalAcceptorFactory':
for wa in self.instances:

View File

@@ -6,9 +6,9 @@ use clap::{App, Arg};
use daemonize::Daemonize;
use log::*;
use slog::Drain;
use std::io;
use std::path::{Path, PathBuf};
use std::thread;
use std::{env, io};
use std::{fs::File, fs::OpenOptions};
use walkeeper::s3_offload;
@@ -74,6 +74,7 @@ fn main() -> Result<()> {
listen_addr: "localhost:5454".to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
};
if let Some(dir) = arg_matches.value_of("datadir") {

View File

@@ -16,6 +16,8 @@ pub struct WalAcceptorConf {
pub no_sync: bool,
pub listen_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,
pub ttl: Option<Duration>,
pub recall_period: Option<Duration>,
}

View File

@@ -17,11 +17,11 @@ use std::thread::sleep;
use zenith_utils::bin_ser::LeSer;
use zenith_utils::connstring::connection_host_port;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::HotStandbyFeedback;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use pageserver::{ZTenantId, ZTimelineId};
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
use zenith_utils::pq_proto::SystemId;
@@ -153,7 +153,11 @@ pub struct ReceiveWalConn {
///
fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
let ps_addr = conf.pageserver_addr.unwrap();
let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr);
let ps_connstr = format!(
"postgresql://no_user:{}@{}/no_db",
&conf.pageserver_auth_token.unwrap_or_default(),
ps_addr
);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);

View File

@@ -7,12 +7,12 @@ use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use anyhow::{bail, Result};
use bytes::Bytes;
use pageserver::ZTimelineId;
use std::str::FromStr;
use std::sync::Arc;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor};
use zenith_utils::zid::ZTimelineId;
/// Handler for streaming WAL from acceptor
pub struct SendWalHandler {

View File

@@ -5,7 +5,6 @@ use anyhow::{bail, Result};
use fs2::FileExt;
use lazy_static::lazy_static;
use log::*;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID};
use std::cmp::{max, min};
use std::collections::HashMap;
@@ -15,6 +14,7 @@ use std::path::Path;
use std::sync::{Arc, Condvar, Mutex};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTimelineId;
use crate::receive_wal::{SafeKeeperInfo, CONTROL_FILE_NAME, SK_FORMAT_VERSION, SK_MAGIC};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};

View File

@@ -12,7 +12,7 @@ anyhow = "1.0"
serde_json = "1"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
# FIXME: 'pageserver' is needed for BranchInfo. Refactor
pageserver = { path = "../pageserver" }
control_plane = { path = "../control_plane" }
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -4,13 +4,15 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env::{self, LocalEnv};
use control_plane::storage::PageServerNode;
use pageserver::ZTenantId;
use std::collections::btree_map::Entry;
use std::collections::HashMap;
use std::process::exit;
use std::str::FromStr;
use zenith_utils::auth::{encode_from_key_path, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use pageserver::{branches::BranchInfo, ZTimelineId};
use pageserver::branches::BranchInfo;
use zenith_utils::lsn::Lsn;
///
@@ -53,6 +55,12 @@ fn main() -> Result<()> {
.long("remote-pageserver")
.required(false)
.value_name("pageserver-url"),
)
.arg(
Arg::with_name("enable-auth")
.long("enable-auth")
.takes_value(false)
.help("Enable authentication using ZenithJWT")
),
)
.subcommand(
@@ -115,10 +123,16 @@ fn main() -> Result<()> {
.get_matches();
// Create config file
if let ("init", Some(sub_args)) = matches.subcommand() {
if let ("init", Some(init_match)) = matches.subcommand() {
let tenantid = ZTenantId::generate();
let pageserver_uri = sub_args.value_of("pageserver-url");
local_env::init(pageserver_uri, tenantid)
let pageserver_uri = init_match.value_of("pageserver-url");
let auth_type = if init_match.is_present("enable-auth") {
AuthType::ZenithJWT
} else {
AuthType::Trust
};
local_env::init(pageserver_uri, tenantid, auth_type)
.with_context(|| "Failed to create config file")?;
}
@@ -132,9 +146,12 @@ fn main() -> Result<()> {
};
match matches.subcommand() {
("init", Some(_)) => {
("init", Some(init_match)) => {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(Some(&env.tenantid.to_string())) {
if let Err(e) = pageserver.init(
Some(&env.tenantid.to_string()),
init_match.is_present("enable-auth"),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
}
@@ -447,12 +464,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned()));
let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) {
let claims = Claims::new(Some(tenantid), Scope::Tenant);
Some(encode_from_key_path(&claims, &env.private_key_path)?)
} else {
None
};
println!("Starting postgres on timeline {}...", timeline_name);
if let Some(node) = node {
node.start()?;
node.start(&auth_token)?;
} else {
let node = cplane.new_node(tenantid, timeline_name)?;
node.start()?;
node.start(&auth_token)?;
}
}
("stop", Some(stop_match)) => {

View File

@@ -20,6 +20,8 @@ tokio = { version = "1.5.0", features = ["full"] }
zenith_metrics = { path = "../zenith_metrics" }
workspace_hack = { path = "../workspace_hack" }
rand = "0.8.3"
jsonwebtoken = "7"
hex = { version = "0.4.3", features = ["serde"] }
[dev-dependencies]
hex-literal = "0.3"

104
zenith_utils/src/auth.rs Normal file
View File

@@ -0,0 +1,104 @@
// For details about authentication see docs/authentication.md
// TODO there are two issues for our use case in jsonwebtoken library which will be resolved in next release
// The fisrt one is that there is no way to disable expiration claim, but it can be excluded from validation, so use this as a workaround for now.
// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/190
// The second one is that we wanted to use ed25519 keys, but they are also not supported until next version. So we go with RSA keys for now.
// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/162
use hex::{self, FromHex};
use serde::de::Error;
use serde::{self, Deserializer, Serializer};
use std::{fs, path::PathBuf};
use anyhow::Result;
use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
use crate::zid::ZTenantId;
const JWT_ALGORITHM: Algorithm = Algorithm::RS256;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
Tenant,
PageServerApi,
}
pub fn to_hex_option<S>(value: &Option<ZTenantId>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Some(tid) => hex::serialize(tid, serializer),
None => Option::serialize(value, serializer),
}
}
fn from_hex_option<'de, D>(deserializer: D) -> Result<Option<ZTenantId>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
match opt {
Some(tid) => return Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)),
None => return Ok(None),
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
// this custom serialize/deserialize_with is needed because Option is not transparent to serde
// so clearest option is serde(with = "hex") but it is not working, for details see https://github.com/serde-rs/serde/issues/1301
#[serde(
default,
skip_serializing_if = "Option::is_none",
serialize_with = "to_hex_option",
deserialize_with = "from_hex_option"
)]
pub tenant_id: Option<ZTenantId>,
pub scope: Scope,
}
impl Claims {
pub fn new(tenant_id: Option<ZTenantId>, scope: Scope) -> Self {
Self { tenant_id, scope }
}
}
#[derive(Debug)]
pub struct JwtAuth {
decoding_key: DecodingKey<'static>,
validation: Validation,
}
impl JwtAuth {
pub fn new<'a>(decoding_key: DecodingKey<'a>) -> Self {
Self {
decoding_key: decoding_key.into_static(),
validation: Validation {
algorithms: vec![JWT_ALGORITHM],
validate_exp: false,
..Default::default()
},
}
}
pub fn from_key_path(key_path: &PathBuf) -> Result<Self> {
let public_key = fs::read_to_string(key_path)?;
Ok(Self::new(DecodingKey::from_rsa_pem(public_key.as_bytes())?))
}
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
Ok(decode(token, &self.decoding_key, &self.validation)?)
}
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn encode_from_key_path(claims: &Claims, key_path: &PathBuf) -> Result<String> {
let key_data = fs::read_to_string(key_path)?;
let key = EncodingKey::from_rsa_pem(&key_data.as_bytes())?;
Ok(encode(&Header::new(JWT_ALGORITHM), claims, &key)?)
}

View File

@@ -16,3 +16,9 @@ pub mod pq_proto;
// dealing with connstring parsing and handy access to it's parts
pub mod connstring;
// common authentication routines
pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod zid;

View File

@@ -4,12 +4,14 @@
//! is rather narrow, but we can extend it once required.
use crate::pq_proto::{BeMessage, FeMessage, FeStartupMessage, StartupRequestCode};
use anyhow::{bail, Result};
use anyhow::{bail, ensure, Result};
use bytes::{Bytes, BytesMut};
use log::*;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::io::{self, BufReader, Write};
use std::net::{Shutdown, TcpStream};
use std::str::FromStr;
pub trait Handler {
/// Handle single query.
@@ -27,9 +29,14 @@ pub trait Handler {
Ok(())
}
/// Check auth
/// Check auth md5
fn check_auth_md5(&mut self, _pgb: &mut PostgresBackend, _md5_response: &[u8]) -> Result<()> {
bail!("Auth failed")
bail!("MD5 auth failed")
}
/// Check auth jwt
fn check_auth_jwt(&mut self, _pgb: &mut PostgresBackend, _jwt_response: &[u8]) -> Result<()> {
bail!("JWT auth failed")
}
}
@@ -42,10 +49,25 @@ pub enum ProtoState {
Established,
}
#[derive(Clone, Copy, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy, Serialize, Deserialize)]
pub enum AuthType {
Trust,
MD5,
// This mimics postgres's AuthenticationCleartextPassword but instead of password expects JWT
ZenithJWT,
}
impl FromStr for AuthType {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Trust" => Ok(Self::Trust),
"MD5" => Ok(Self::MD5),
"ZenithJWT" => Ok(Self::ZenithJWT),
_ => bail!("invalid value \"{}\" for auth type", s),
}
}
}
#[derive(Clone, Copy)]
@@ -125,7 +147,7 @@ impl PostgresBackend {
use ProtoState::*;
match state {
ProtoState::Initialization => FeStartupMessage::read(stream),
Initialization => FeStartupMessage::read(stream),
Authentication | Established => FeMessage::read(stream),
}
}
@@ -184,13 +206,13 @@ impl PostgresBackend {
// Allow only startup and password messages during auth. Otherwise client would be able to bypass auth
// TODO: change that to proper top-level match of protocol state with separate message handling for each state
if self.state < ProtoState::Established {
match &msg {
FeMessage::PasswordMessage(_m) => {}
FeMessage::StartupMessage(_m) => {}
_ => {
bail!("protocol violation");
}
}
ensure!(
matches!(
msg,
FeMessage::PasswordMessage(_) | FeMessage::StartupMessage(_)
),
"protocol violation"
);
}
match msg {
@@ -224,6 +246,10 @@ impl PostgresBackend {
))?;
self.state = ProtoState::Authentication;
}
AuthType::ZenithJWT => {
self.write_message(&BeMessage::AuthenticationCleartextPassword)?;
self.state = ProtoState::Authentication;
}
}
}
StartupRequestCode::Cancel => {
@@ -237,21 +263,35 @@ impl PostgresBackend {
assert!(self.state == ProtoState::Authentication);
let (_, md5_response) = m
.split_last()
.ok_or_else(|| anyhow::Error::msg("protocol violation"))?;
match self.auth_type {
AuthType::Trust => unreachable!(),
AuthType::MD5 => {
let (_, md5_response) = m
.split_last()
.ok_or_else(|| anyhow::Error::msg("protocol violation"))?;
if let Err(e) = handler.check_auth_md5(self, md5_response) {
self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?;
bail!("auth failed: {}", e);
} else {
self.write_message_noflush(&BeMessage::AuthenticationOk)?;
// psycopg2 will not connect if client_encoding is not
// specified by the server
self.write_message_noflush(&BeMessage::ParameterStatus)?;
self.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
if let Err(e) = handler.check_auth_md5(self, md5_response) {
self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?;
bail!("auth failed: {}", e);
}
}
AuthType::ZenithJWT => {
let (_, jwt_response) = m
.split_last()
.ok_or_else(|| anyhow::Error::msg("protocol violation"))?;
if let Err(e) = handler.check_auth_jwt(self, jwt_response) {
self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?;
bail!("auth failed: {}", e);
}
}
}
self.write_message_noflush(&BeMessage::AuthenticationOk)?;
// psycopg2 will not connect if client_encoding is not
// specified by the server
self.write_message_noflush(&BeMessage::ParameterStatus)?;
self.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}
FeMessage::Query(m) => {

View File

@@ -329,6 +329,7 @@ fn read_null_terminated(buf: &mut Bytes) -> anyhow::Result<Bytes> {
pub enum BeMessage<'a> {
AuthenticationOk,
AuthenticationMD5Password(&'a [u8; 4]),
AuthenticationCleartextPassword,
BindComplete,
CommandComplete(&'a [u8]),
ControlFile,
@@ -471,6 +472,15 @@ impl<'a> BeMessage<'a> {
.unwrap(); // write into BytesMut can't fail
}
BeMessage::AuthenticationCleartextPassword => {
buf.put_u8(b'R');
write_body(buf, |buf| {
buf.put_i32(3); // Specifies that clear text password is required.
Ok::<_, io::Error>(())
})
.unwrap(); // write into BytesMut can't fail
}
BeMessage::AuthenticationMD5Password(salt) => {
buf.put_u8(b'R');
write_body(buf, |buf| {

155
zenith_utils/src/zid.rs Normal file
View File

@@ -0,0 +1,155 @@
use std::{fmt, str::FromStr};
use hex::FromHex;
use rand::Rng;
use serde::{Deserialize, Serialize};
// Zenith ID is a 128-bit random ID.
// Used to represent various identifiers. Provides handy utility methods and impls.
// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct ZId([u8; 16]);
impl ZId {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId {
let mut arr = [0u8; 16];
buf.copy_to_slice(&mut arr);
ZId::from(arr)
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
pub fn generate() -> Self {
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
ZId::from(tli_buf)
}
}
impl FromStr for ZId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<ZId, Self::Err> {
Self::from_hex(s)
}
}
// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate
impl FromHex for ZId {
type Error = hex::FromHexError;
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
let mut buf: [u8; 16] = [0u8; 16];
hex::decode_to_slice(hex, &mut buf)?;
Ok(ZId(buf))
}
}
impl AsRef<[u8]> for ZId {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl From<[u8; 16]> for ZId {
fn from(b: [u8; 16]) -> Self {
ZId(b)
}
}
impl fmt::Display for ZId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}
macro_rules! zid_newtype {
($t:ident) => {
impl $t {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> $t {
$t(ZId::get_from_buf(buf))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}
pub fn generate() -> Self {
$t(ZId::generate())
}
}
impl FromStr for $t {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<$t, Self::Err> {
let value = ZId::from_str(s)?;
Ok($t(value))
}
}
impl From<[u8; 16]> for $t {
fn from(b: [u8; 16]) -> Self {
$t(ZId::from(b))
}
}
impl fmt::Display for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
};
}
/// Zenith timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given
/// timeline history. Those limitations mean that we cannot generate a
/// new PostgreSQL timeline ID by just generating a random number. And
/// that in turn is problematic for the "pull/push" workflow, where you
/// have a local copy of a zenith repository, and you periodically sync
/// the local changes with a remote server. When you work "detached"
/// from the remote server, you cannot create a PostgreSQL timeline ID
/// that's guaranteed to be different from all existing timelines in
/// the remote server. For example, if two people are having a clone of
/// the repository on their laptops, and they both create a new branch
/// with different name. What timeline ID would they assign to their
/// branches? If they pick the same one, and later try to push the
/// branches to the same remote server, they will get mixed up.
///
/// To avoid those issues, Zenith has its own concept of timelines that
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);
// Zenith Tenant Id represents identifiar of a particular tenant.
// Is used for distinguishing requests and data belonging to different users.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
// for now the following impls are used only with ZTenantId,
// if this impls become useful in other newtypes they can be moved under zid_newtype macro too
impl FromHex for ZTenantId {
type Error = hex::FromHexError;
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
Ok(ZTenantId(ZId::from_hex(hex)?))
}
}
impl AsRef<[u8]> for ZTenantId {
fn as_ref(&self) -> &[u8] {
&self.0 .0
}
}