diff --git a/Cargo.lock b/Cargo.lock index 1598fede16..67c00293b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,12 @@ dependencies = [ "anyhow", ] +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + [[package]] name = "base64" version = "0.13.0" @@ -826,6 +832,20 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "7.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afabcc15e437a6484fc4f12d0fd63068fe457bf93f1c148d3d9649c60b103f32" +dependencies = [ + "base64 0.12.3", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1019,6 +1039,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1178,6 +1209,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb" +dependencies = [ + "base64 0.13.0", + "once_cell", + "regex", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1240,7 +1282,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" dependencies = [ - "base64", + "base64 0.13.0", "byteorder", "bytes", "fallible-iterator", @@ -1257,7 +1299,7 @@ name = "postgres-protocol" version = "0.6.1" source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858#9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" dependencies = [ - "base64", + "base64 0.13.0", "byteorder", "bytes", "fallible-iterator", @@ -1486,7 +1528,7 @@ version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" dependencies = [ - "base64", + "base64 0.13.0", "bytes", "encoding_rs", "futures-core", @@ -1515,6 +1557,21 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rocksdb" version = "0.16.0" @@ -1545,7 +1602,7 @@ dependencies = [ "async-trait", "aws-creds", "aws-region", - "base64", + "base64 0.13.0", "cfg-if 1.0.0", "chrono", "futures", @@ -1740,6 +1797,17 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_asn1" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b" +dependencies = [ + "chrono", + "num-bigint", + "num-traits", +] + [[package]] name = "siphasher" version = "0.3.5" @@ -1821,6 +1889,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "stringprep" version = "0.1.2" @@ -2157,6 +2231,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.2.2" @@ -2443,8 +2523,10 @@ dependencies = [ "bincode", "byteorder", "bytes", + "hex", "hex-literal", "hyper", + "jsonwebtoken", "lazy_static", "log", "postgres", diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 9bcacb88e7..434f4f167c 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -15,10 +15,11 @@ use anyhow::{Context, Result}; use lazy_static::lazy_static; use regex::Regex; use zenith_utils::connstring::connection_host_port; +use zenith_utils::postgres_backend::AuthType; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; use crate::local_env::LocalEnv; -use pageserver::{ZTenantId, ZTimelineId}; - use crate::storage::PageServerNode; // @@ -103,7 +104,7 @@ impl ComputeControlPlane { tenantid, }); - node.init_from_page_server()?; + node.init_from_page_server(self.env.auth_type)?; self.nodes .insert((tenantid, node.name.clone()), Arc::clone(&node)); @@ -247,7 +248,7 @@ impl PostgresNode { // Connect to a page server, get base backup, and untar it to initialize a // new data directory - pub fn init_from_page_server(&self) -> Result<()> { + pub fn init_from_page_server(&self, auth_type: AuthType) -> Result<()> { let pgdata = self.pgdata(); println!( @@ -322,18 +323,27 @@ impl PostgresNode { // Connect it to the page server. + // set up authentication + let password = if let AuthType::ZenithJWT = auth_type { + "$ZENITH_AUTH_TOKEN" + } else { + "" + }; + // Configure that node to take pages from pageserver - let (host, port) = connection_host_port(&self.pageserver.connection_config()); + let (host, port) = connection_host_port(&self.pageserver.connection_config); self.append_conf( "postgresql.conf", format!( concat!( "shared_preload_libraries = zenith\n", - "zenith.page_server_connstring = 'host={} port={}'\n", + // $ZENITH_AUTH_TOKEN will be replaced with value from environment variable during compute pg startup + // it is done this way because otherwise user will be able to retrieve the value using SHOW command or pg_settings + "zenith.page_server_connstring = 'host={} port={} password={}'\n", "zenith.zenith_timeline='{}'\n", "zenith.zenith_tenant='{}'\n", ), - host, port, self.timelineid, self.tenantid, + host, port, password, self.timelineid, self.tenantid, ) .as_str(), )?; @@ -368,45 +378,48 @@ impl PostgresNode { Ok(()) } - fn pg_ctl(&self, args: &[&str]) -> Result<()> { + fn pg_ctl(&self, args: &[&str], auth_token: &Option) -> Result<()> { let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); + let mut cmd = Command::new(pg_ctl_path); + cmd.args( + [ + &[ + "-D", + self.pgdata().to_str().unwrap(), + "-l", + self.pgdata().join("pg.log").to_str().unwrap(), + "-w", //wait till pg_ctl actually does what was asked + ], + args, + ] + .concat(), + ) + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); + + if let Some(token) = auth_token { + cmd.env("ZENITH_AUTH_TOKEN", token); + } + let pg_ctl = cmd.status().with_context(|| "pg_ctl failed")?; - let pg_ctl = Command::new(pg_ctl_path) - .args( - [ - &[ - "-D", - self.pgdata().to_str().unwrap(), - "-l", - self.pgdata().join("pg.log").to_str().unwrap(), - "-w", //wait till pg_ctl actually does what was asked - ], - args, - ] - .concat(), - ) - .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .with_context(|| "pg_ctl failed")?; if !pg_ctl.success() { anyhow::bail!("pg_ctl failed"); } Ok(()) } - pub fn start(&self) -> Result<()> { + pub fn start(&self, auth_token: &Option) -> Result<()> { println!("Starting postgres node at '{}'", self.connstr()); - self.pg_ctl(&["start"]) + self.pg_ctl(&["start"], auth_token) } - pub fn restart(&self) -> Result<()> { - self.pg_ctl(&["restart"]) + pub fn restart(&self, auth_token: &Option) -> Result<()> { + self.pg_ctl(&["restart"], auth_token) } pub fn stop(&self, destroy: bool) -> Result<()> { - self.pg_ctl(&["-m", "immediate", "stop"])?; + self.pg_ctl(&["-m", "immediate", "stop"], &None)?; if destroy { println!( "Destroying postgres data directory '{}'", diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 7f25f105bd..084285cb16 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,14 +4,17 @@ // Now it also provides init method which acts like a stub for proper installation // script which will use local paths. // -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use hex; -use pageserver::ZTenantId; use serde::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; +use std::process::{Command, Stdio}; use std::{collections::BTreeMap, env}; use url::Url; +use zenith_utils::auth::{encode_from_key_path, Claims, Scope}; +use zenith_utils::postgres_backend::AuthType; +use zenith_utils::zid::ZTenantId; pub type Remotes = BTreeMap; @@ -39,6 +42,15 @@ pub struct LocalEnv { #[serde(with = "hex")] pub tenantid: ZTenantId, + // jwt auth token used for communication with pageserver + pub auth_token: String, + + // used to determine which auth type is used + pub auth_type: AuthType, + + // used to issue tokens during e.g pg start + pub private_key_path: PathBuf, + pub remotes: Remotes, } @@ -85,7 +97,11 @@ fn base_path() -> PathBuf { // // Initialize a new Zenith repository // -pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> { +pub fn init( + remote_pageserver: Option<&str>, + tenantid: ZTenantId, + auth_type: AuthType, +) -> Result<()> { // check if config already exists let base_path = base_path(); if base_path.exists() { @@ -94,6 +110,7 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> base_path.to_str().unwrap() ); } + fs::create_dir(&base_path)?; // ok, now check that expected binaries are present @@ -110,6 +127,44 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } + // generate keys for jwt + // openssl genrsa -out private_key.pem 2048 + let private_key_path = base_path.join("auth_private_key.pem"); + let keygen_output = Command::new("openssl") + .arg("genrsa") + .args(&["-out", private_key_path.to_str().unwrap()]) + .arg("2048") + .stdout(Stdio::null()) + .output() + .with_context(|| "failed to generate auth private key")?; + if !keygen_output.status.success() { + anyhow::bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + + let public_key_path = base_path.join("auth_public_key.pem"); + // openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem + let keygen_output = Command::new("openssl") + .arg("rsa") + .args(&["-in", private_key_path.to_str().unwrap()]) + .arg("-pubout") + .args(&["-outform", "PEM"]) + .args(&["-out", public_key_path.to_str().unwrap()]) + .stdout(Stdio::null()) + .output() + .with_context(|| "failed to generate auth private key")?; + if !keygen_output.status.success() { + anyhow::bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + + let auth_token = + encode_from_key_path(&Claims::new(None, Scope::PageServerApi), &private_key_path)?; + let conf = if let Some(addr) = remote_pageserver { // check that addr is parsable let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?; @@ -121,6 +176,9 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> base_data_dir: base_path, remotes: BTreeMap::default(), tenantid, + auth_token, + auth_type, + private_key_path, } } else { // Find zenith binaries. @@ -136,6 +194,9 @@ pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> base_data_dir: base_path, remotes: BTreeMap::default(), tenantid, + auth_token, + auth_type, + private_key_path, } }; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index d12e8d3366..cc576b1c45 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -8,8 +8,9 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use pageserver::ZTenantId; use postgres::{Config, NoTls}; +use zenith_utils::postgres_backend::AuthType; +use zenith_utils::zid::ZTenantId; use crate::local_env::LocalEnv; use crate::read_pidfile; @@ -24,33 +25,32 @@ use zenith_utils::connstring::connection_address; #[derive(Debug)] pub struct PageServerNode { pub kill_on_exit: bool, - pub connection_config: Option, + pub connection_config: Config, pub env: LocalEnv, } impl PageServerNode { pub fn from_env(env: &LocalEnv) -> PageServerNode { + let password = if matches!(env.auth_type, AuthType::ZenithJWT) { + &env.auth_token + } else { + "" + }; + PageServerNode { kill_on_exit: false, - connection_config: None, // default + connection_config: Self::default_config(password), // default env: env.clone(), } } - fn default_config() -> Config { - "postgresql://no_user@localhost:64000/no_db" + fn default_config(password: &str) -> Config { + format!("postgresql://no_user:{}@localhost:64000/no_db", password) .parse() .unwrap() } - pub fn connection_config(&self) -> Config { - match &self.connection_config { - Some(config) => config.clone(), - None => Self::default_config(), - } - } - - pub fn init(&self, create_tenant: Option<&str>) -> Result<()> { + pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); let mut args = vec![ "--init", @@ -59,6 +59,12 @@ impl PageServerNode { "--postgres-distrib", self.env.pg_distrib_dir.to_str().unwrap(), ]; + + if enable_auth { + args.extend(&["--auth-validation-public-key-path", "auth_public_key.pem"]); + args.extend(&["--auth-type", "ZenithJWT"]); + } + create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid])); let status = cmd .args(args) @@ -85,7 +91,7 @@ impl PageServerNode { pub fn start(&self) -> Result<()> { println!( "Starting pageserver at '{}' in {}", - connection_address(&self.connection_config()), + connection_address(&self.connection_config), self.repo_path().display() ); @@ -105,18 +111,21 @@ impl PageServerNode { // It takes a while for the page server to start up. Wait until it is // open for business. for retries in 1..15 { - let client = self.page_server_psql_client(); - if client.is_ok() { - break; - } else { - println!("Pageserver not responding yet, retrying ({})...", retries); - thread::sleep(Duration::from_secs(1)); + match self.page_server_psql_client() { + Ok(_) => { + println!("Pageserver started"); + return Ok(()); + } + Err(err) => { + println!( + "Pageserver not responding yet, err {} retrying ({})...", + err, retries + ); + thread::sleep(Duration::from_secs(1)); + } } } - - println!("Pageserver started"); - - Ok(()) + bail!("pageserver failed to start"); } pub fn stop(&self) -> Result<()> { @@ -127,7 +136,7 @@ impl PageServerNode { } // wait for pageserver stop - let address = connection_address(&self.connection_config()); + let address = connection_address(&self.connection_config); for _ in 0..5 { let stream = TcpStream::connect(&address); thread::sleep(Duration::from_secs(1)); @@ -142,14 +151,14 @@ impl PageServerNode { } pub fn page_server_psql(&self, sql: &str) -> Vec { - let mut client = self.connection_config().connect(NoTls).unwrap(); + let mut client = self.connection_config.connect(NoTls).unwrap(); println!("Pageserver query: '{}'", sql); client.simple_query(sql).unwrap() } pub fn page_server_psql_client(&self) -> Result { - self.connection_config().connect(NoTls) + self.connection_config.connect(NoTls) } pub fn tenants_list(&self) -> Result> { diff --git a/docs/authentication.md b/docs/authentication.md new file mode 100644 index 0000000000..de408624ae --- /dev/null +++ b/docs/authentication.md @@ -0,0 +1,30 @@ +## Authentication + +### Overview + +Current state of authentication includes usage of JWT tokens in communication between compute and pageserver and between CLI and pageserver. JWT token is signed using RSA keys. CLI generates a key pair during call to `zenith init`. Using following openssl commands: + +```bash +openssl genrsa -out private_key.pem 2048 +openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem +``` + +CLI also generates signed token and saves it in the config for later access to pageserver. Now authentication is optional. Pageserver has two variables in config: `auth_validation_public_key_path` and `auth_type`, so when auth type present and set to `ZenithJWT` pageserver will require authentication for connections. Actual JWT is passed in password field of connection string. There is a caveat for psql, it silently truncates passwords to 100 symbols, so to correctly pass JWT via psql you have to either use PGPASSWORD environment variable, or store password in psql config file. + +Currently there is no authentication between compute and safekeepers, because this communication layer is under heavy refactoring. After this refactoring support for authentication will be added there too. Now safekeeper supports "hardcoded" token passed via environment variable to be able to use callmemaybe command in pageserver. + +Compute uses token passed via environment variable to communicate to pageserver and in the future to the safekeeper too. + +JWT authentication now supports two scopes: tenant and pageserverapi. Tenant scope is intended for use in tenant related api calls, e.g. create_branch. Compute launched for particular tenant also uses this scope. Scope pageserver api is intended to be used by console to manage pageserver. For now we have only one management operation - create tenant. + +Examples for token generation in python: + +```python +# generate pageserverapi token +management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256") + +# generate tenant token +tenant_token = jwt.encode({"scope": "tenant", "tenant_id": ps.initial_tenant}, auth_keys.priv, algorithm="RS256") +``` + +Utility functions to work with jwts in rust are located in zenith_utils/src/auth.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f40dd00e55..914c8858ca 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,11 +9,14 @@ use std::{ net::TcpListener, path::{Path, PathBuf}, process::exit, + str::FromStr, + sync::Arc, thread, time::Duration, }; +use zenith_utils::{auth::JwtAuth, postgres_backend::AuthType}; -use anyhow::Result; +use anyhow::{ensure, Result}; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; @@ -36,6 +39,8 @@ struct CfgFileParams { gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, + auth_validation_public_key_path: Option, + auth_type: Option, } impl CfgFileParams { @@ -51,6 +56,8 @@ impl CfgFileParams { gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), + auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), + auth_type: get_arg("auth-type"), } } @@ -63,11 +70,17 @@ impl CfgFileParams { gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), + auth_validation_public_key_path: self + .auth_validation_public_key_path + .or(other.auth_validation_public_key_path), + auth_type: self.auth_type.or(other.auth_type), } } /// Create a PageServerConf from these string parameters fn try_into_config(&self) -> Result { + let workdir = PathBuf::from("."); + let listen_addr = match self.listen_addr.as_ref() { Some(addr) => addr.clone(), None => DEFAULT_LISTEN_ADDR.to_owned(), @@ -92,10 +105,34 @@ impl CfgFileParams { None => env::current_dir()?.join("tmp_install"), }; + let auth_validation_public_key_path = self + .auth_validation_public_key_path + .as_ref() + .map(PathBuf::from); + + let auth_type = self + .auth_type + .as_ref() + .map_or(Ok(AuthType::Trust), |auth_type| { + AuthType::from_str(&auth_type) + })?; + if !pg_distrib_dir.join("bin/postgres").exists() { anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } + if auth_type == AuthType::ZenithJWT { + ensure!( + auth_validation_public_key_path.is_some(), + "Missing auth_validation_public_key_path when auth_type is ZenithJWT" + ); + let path_ref = auth_validation_public_key_path.as_ref().unwrap(); + ensure!( + path_ref.exists(), + format!("Can't find auth_validation_public_key at {:?}", path_ref) + ); + } + Ok(PageServerConf { daemonize: false, @@ -106,9 +143,13 @@ impl CfgFileParams { superuser: String::from(DEFAULT_SUPERUSER), - workdir: PathBuf::from("."), + workdir, pg_distrib_dir, + + auth_validation_public_key_path, + + auth_type, }) } } @@ -168,6 +209,18 @@ fn main() -> Result<()> { .help("Create tenant during init") .requires("init"), ) + .arg( + Arg::with_name("auth-validation-public-key-path") + .long("auth-validation-public-key-path") + .takes_value(true) + .help("Path to public key used to validate jwt signature"), + ) + .arg( + Arg::with_name("auth-type") + .long("auth-type") + .takes_value(true) + .help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); @@ -188,6 +241,9 @@ fn main() -> Result<()> { args_params.or(file_params) }; + // Set CWD to workdir for non-daemon modes + env::set_current_dir(&workdir)?; + // Ensure the config is valid, even if just init-ing let mut conf = params.try_into_config()?; @@ -205,17 +261,15 @@ fn main() -> Result<()> { // Create repo and exit if init was requested if init { - branches::init_pageserver(conf, workdir, create_tenant)?; + branches::init_pageserver(conf, create_tenant)?; // write the config file let cfg_file_contents = toml::to_string_pretty(¶ms)?; + // TODO support enable-auth flag std::fs::write(&cfg_file_path, cfg_file_contents)?; return Ok(()); } - // Set CWD to workdir for non-daemon modes - env::set_current_dir(&workdir)?; - start_pageserver(conf) } @@ -262,11 +316,23 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Initialize page cache, this will spawn walredo_thread page_cache::init(conf); + // initialize authentication for incoming connections + let auth = match &conf.auth_type { + AuthType::Trust | AuthType::MD5 => Arc::new(None), + AuthType::ZenithJWT => { + // unwrap is ok because check is performed when creating config, so path is set and file exists + let key_path = conf.auth_validation_public_key_path.as_ref().unwrap(); + Arc::new(Some(JwtAuth::from_key_path(key_path)?)) + } + }; + info!("Using auth: {:#?}", conf.auth_type); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. let page_service_thread = thread::Builder::new() .name("Page Service thread".into()) - .spawn(move || page_service::thread_main(conf, pageserver_listener))?; + .spawn(move || { + page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) + })?; page_service_thread .join() diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index bfd28d5038..2739312326 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -7,7 +7,6 @@ use anyhow::{bail, ensure, Context, Result}; use postgres_ffi::ControlFileData; use serde::{Deserialize, Serialize}; -use std::env; use std::{ fs, path::Path, @@ -15,6 +14,7 @@ use std::{ str::FromStr, sync::Arc, }; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use log::*; use zenith_utils::lsn::Lsn; @@ -24,8 +24,7 @@ use crate::object_repository::ObjectRepository; use crate::page_cache; use crate::restore_local_repo; use crate::walredo::WalRedoManager; -use crate::ZTenantId; -use crate::{repository::Repository, PageServerConf, ZTimelineId}; +use crate::{repository::Repository, PageServerConf}; #[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { @@ -42,13 +41,7 @@ pub struct PointInTime { pub lsn: Lsn, } -pub fn init_pageserver( - conf: &'static PageServerConf, - workdir: &Path, - create_tenant: Option<&str>, -) -> Result<()> { - env::set_current_dir(workdir)?; - +pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str>) -> Result<()> { // Initialize logger let (_scope_guard, _log_file) = logger::init_logging(&conf, "pageserver.log")?; let _log_guard = slog_stdlog::init()?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 172c3145da..8354426c22 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,12 +1,10 @@ -use std::fmt; +use zenith_utils::postgres_backend::AuthType; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + use std::path::PathBuf; -use std::str::FromStr; use std::time::Duration; -use hex::FromHex; use lazy_static::lazy_static; -use rand::Rng; -use serde::{Deserialize, Serialize}; use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; pub mod basebackup; @@ -52,6 +50,10 @@ pub struct PageServerConf { pub workdir: PathBuf, pub pg_distrib_dir: PathBuf, + + pub auth_type: AuthType, + + pub auth_validation_public_key_path: Option, } impl PageServerConf { @@ -111,178 +113,3 @@ impl PageServerConf { self.pg_distrib_dir.join("lib") } } - -// Zenith ID is a 128-bit random ID. -// Used to represent various identifiers. Provides handy utility methods and impls. -// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] -struct ZId([u8; 16]); - -impl ZId { - pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId { - let mut arr = [0u8; 16]; - buf.copy_to_slice(&mut arr); - ZId::from(arr) - } - - pub fn as_arr(&self) -> [u8; 16] { - self.0 - } - - pub fn generate() -> Self { - let mut tli_buf = [0u8; 16]; - rand::thread_rng().fill(&mut tli_buf); - ZId::from(tli_buf) - } -} - -impl FromStr for ZId { - type Err = hex::FromHexError; - - fn from_str(s: &str) -> Result { - Self::from_hex(s) - } -} - -// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate -impl FromHex for ZId { - type Error = hex::FromHexError; - - fn from_hex>(hex: T) -> Result { - let mut buf: [u8; 16] = [0u8; 16]; - hex::decode_to_slice(hex, &mut buf)?; - Ok(ZId(buf)) - } -} - -impl AsRef<[u8]> for ZId { - fn as_ref(&self) -> &[u8] { - &self.0 - } -} - -impl From<[u8; 16]> for ZId { - fn from(b: [u8; 16]) -> Self { - ZId(b) - } -} - -impl fmt::Display for ZId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) - } -} - -/// Zenith timeline IDs are different from PostgreSQL timeline -/// IDs. They serve a similar purpose though: they differentiate -/// between different "histories" of the same cluster. However, -/// PostgreSQL timeline IDs are a bit cumbersome, because they are only -/// 32-bits wide, and they must be in ascending order in any given -/// timeline history. Those limitations mean that we cannot generate a -/// new PostgreSQL timeline ID by just generating a random number. And -/// that in turn is problematic for the "pull/push" workflow, where you -/// have a local copy of a zenith repository, and you periodically sync -/// the local changes with a remote server. When you work "detached" -/// from the remote server, you cannot create a PostgreSQL timeline ID -/// that's guaranteed to be different from all existing timelines in -/// the remote server. For example, if two people are having a clone of -/// the repository on their laptops, and they both create a new branch -/// with different name. What timeline ID would they assign to their -/// branches? If they pick the same one, and later try to push the -/// branches to the same remote server, they will get mixed up. -/// -/// To avoid those issues, Zenith has its own concept of timelines that -/// is separate from PostgreSQL timelines, and doesn't have those -/// limitations. A zenith timeline is identified by a 128-bit ID, which -/// is usually printed out as a hex string. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ZTimelineId(ZId); - -impl FromStr for ZTimelineId { - type Err = hex::FromHexError; - - fn from_str(s: &str) -> Result { - let value = ZId::from_str(s)?; - Ok(ZTimelineId(value)) - } -} - -impl From<[u8; 16]> for ZTimelineId { - fn from(b: [u8; 16]) -> Self { - ZTimelineId(ZId::from(b)) - } -} - -impl ZTimelineId { - pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId { - ZTimelineId(ZId::get_from_buf(buf)) - } - - pub fn as_arr(&self) -> [u8; 16] { - self.0.as_arr() - } - - pub fn generate() -> Self { - ZTimelineId(ZId::generate()) - } -} - -impl fmt::Display for ZTimelineId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -// Zenith Tenant Id represents identifiar of a particular tenant. -// Is used for distinguishing requests and data belonging to different users. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] -pub struct ZTenantId(ZId); - -impl FromStr for ZTenantId { - type Err = hex::FromHexError; - - fn from_str(s: &str) -> Result { - let value = ZId::from_str(s)?; - Ok(ZTenantId(value)) - } -} - -impl From<[u8; 16]> for ZTenantId { - fn from(b: [u8; 16]) -> Self { - ZTenantId(ZId::from(b)) - } -} - -impl FromHex for ZTenantId { - type Error = hex::FromHexError; - - fn from_hex>(hex: T) -> Result { - Ok(ZTenantId(ZId::from_hex(hex)?)) - } -} - -impl AsRef<[u8]> for ZTenantId { - fn as_ref(&self) -> &[u8] { - &self.0 .0 - } -} - -impl ZTenantId { - pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTenantId { - ZTenantId(ZId::get_from_buf(buf)) - } - - pub fn as_arr(&self) -> [u8; 16] { - self.0.as_arr() - } - - pub fn generate() -> Self { - ZTenantId(ZId::generate()) - } -} - -impl fmt::Display for ZTenantId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/pageserver/src/object_key.rs b/pageserver/src/object_key.rs index bcae9e335b..42f6156c78 100644 --- a/pageserver/src/object_key.rs +++ b/pageserver/src/object_key.rs @@ -3,8 +3,8 @@ //! use crate::relish::RelishTag; -use crate::ZTimelineId; use serde::{Deserialize, Serialize}; +use zenith_utils::zid::ZTimelineId; /// /// ObjectKey is the key type used to identify objects stored in an object diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 5d9487d6be..f48a1af579 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -13,13 +13,13 @@ //! until we find the page we're looking for, making a separate lookup into the //! key-value store for each timeline. +use crate::object_key::*; use crate::object_store::ObjectStore; use crate::relish::*; use crate::repository::*; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; -use crate::{object_key::*, ZTenantId}; -use crate::{PageServerConf, ZTimelineId}; +use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use log::*; @@ -33,6 +33,8 @@ use std::time::{Duration, Instant}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; /// /// A repository corresponds to one .zenith directory. One repository holds multiple @@ -273,11 +275,13 @@ impl Timeline for ObjectTimeline { // Handle truncated SLRU segments. // XXX if this will turn out to be performance critical, // move this check out of the funciton. - if let RelishTag::Slru{slru: _slru, segno: _segno} = rel + if let RelishTag::Slru { + slru: _slru, + segno: _segno, + } = rel { info!("test SLRU rel {:?} at {}", rel, req_lsn); - if !self.get_rel_exists(rel, req_lsn).unwrap_or(false) - { + if !self.get_rel_exists(rel, req_lsn).unwrap_or(false) { info!("SLRU rel {:?} at {} doesn't exist", rel, req_lsn); return Err(anyhow!("SLRU rel doesn't exist")); } @@ -350,9 +354,7 @@ impl Timeline for ObjectTimeline { /// Does relation exist at given LSN? fn get_rel_exists(&self, rel: RelishTag, req_lsn: Lsn) -> Result { - - if let Some(_) = self.get_relish_size(rel, req_lsn)? - { + if let Some(_) = self.get_relish_size(rel, req_lsn)? { trace!("Relation {} exists at {}", rel, req_lsn); return Ok(true); } @@ -714,9 +716,11 @@ impl Timeline for ObjectTimeline { for vers in self.obj_store.object_versions(&key, horizon)? { let lsn = vers.0; prepared_horizon = Lsn::min(lsn, prepared_horizon); - if !self.get_tx_is_in_progress(xid, horizon) - { - info!("unlink twophase_file NOT TRANSACTION_STATUS_IN_PROGRESS {}", xid); + if !self.get_tx_is_in_progress(xid, horizon) { + info!( + "unlink twophase_file NOT TRANSACTION_STATUS_IN_PROGRESS {}", + xid + ); self.obj_store.unlink(&key, lsn)?; result.prep_deleted += 1; } diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index b3927ba325..10eb3fcea6 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -2,11 +2,11 @@ //! use crate::object_key::*; use crate::relish::*; -use crate::ZTimelineId; use anyhow::Result; use std::collections::HashSet; use std::iter::Iterator; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTimelineId; /// /// Low-level storage abstraction. diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 1467afeefc..db24df861f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -6,7 +6,7 @@ use crate::object_repository::ObjectRepository; use crate::repository::Repository; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; -use crate::{PageServerConf, ZTenantId}; +use crate::PageServerConf; use anyhow::{anyhow, bail, Result}; use lazy_static::lazy_static; use log::info; @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::sync::{Arc, Mutex}; +use zenith_utils::zid::ZTenantId; lazy_static! { pub static ref REPOSITORY: Mutex>> = diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5a35c25e7c..b070d3dad8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,22 +10,27 @@ // *callmemaybe $url* -- ask pageserver to start walreceiver on $url // -use anyhow::{anyhow, bail, ensure}; +use anyhow::{anyhow, bail, ensure, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use regex::Regex; use std::io::Write; use std::net::TcpListener; +use std::str; use std::str::FromStr; +use std::sync::Arc; use std::thread; use std::{io, net::TcpStream}; use zenith_metrics::{register_histogram_vec, HistogramVec}; +use zenith_utils::auth::JwtAuth; +use zenith_utils::auth::{Claims, Scope}; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::{self, AuthType}; use zenith_utils::pq_proto::{ BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC, }; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; @@ -35,8 +40,6 @@ use crate::relish::*; use crate::repository::Modification; use crate::walreceiver; use crate::PageServerConf; -use crate::ZTenantId; -use crate::ZTimelineId; // Wrapped in libpq CopyData enum PagestreamFeMessage { @@ -140,22 +143,32 @@ impl PagestreamBeMessage { /// /// Listens for connections, and launches a new handler thread for each. /// -pub fn thread_main(conf: &'static PageServerConf, listener: TcpListener) -> anyhow::Result<()> { +pub fn thread_main( + conf: &'static PageServerConf, + auth: Arc>, + listener: TcpListener, + auth_type: AuthType, +) -> anyhow::Result<()> { loop { let (socket, peer_addr) = listener.accept()?; debug!("accepted connection from {}", peer_addr); socket.set_nodelay(true).unwrap(); - + let local_auth = Arc::clone(&auth); thread::spawn(move || { - if let Err(err) = page_service_conn_main(conf, socket) { + if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) { error!("error: {}", err); } }); } } -fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> anyhow::Result<()> { - // Immediately increment the gauge, then create a job to decrement it on thread exit. +fn page_service_conn_main( + conf: &'static PageServerConf, + auth: Arc>, + socket: TcpStream, + auth_type: AuthType, +) -> anyhow::Result<()> { + // Immediatsely increment the gauge, then create a job to decrement it on thread exit. // One of the pros of `defer!` is that this will *most probably* // get called, even in presence of panics. let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]); @@ -164,14 +177,16 @@ fn page_service_conn_main(conf: &'static PageServerConf, socket: TcpStream) -> a gauge.dec(); } - let mut conn_handler = PageServerHandler::new(conf); - let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; + let mut conn_handler = PageServerHandler::new(conf, auth); + let pgbackend = PostgresBackend::new(socket, auth_type)?; pgbackend.run(&mut conn_handler) } #[derive(Debug)] struct PageServerHandler { conf: &'static PageServerConf, + auth: Arc>, + claims: Option, } const TIME_BUCKETS: &[f64] = &[ @@ -193,8 +208,12 @@ lazy_static! { } impl PageServerHandler { - pub fn new(conf: &'static PageServerConf) -> Self { - PageServerHandler { conf } + pub fn new(conf: &'static PageServerConf, auth: Arc>) -> Self { + PageServerHandler { + conf, + auth, + claims: None, + } } fn handle_controlfile(&self, pgb: &mut PostgresBackend) -> io::Result<()> { @@ -355,9 +374,68 @@ impl PageServerHandler { Ok(()) } + + // when accessing management api supply None as an argument + // when using to authorize tenant pass corresponding tenant id + fn check_permission(&self, tenantid: Option) -> Result<()> { + if self.auth.is_none() { + // auth is set to Trust, nothing to check so just return ok + return Ok(()); + } + // auth is some, just checked above, when auth is some + // then claims are always present because of checks during connetion init + // so this expect won't trigger + let claims = self + .claims + .as_ref() + .expect("claims presence already checked"); + match (&claims.scope, tenantid) { + (Scope::Tenant, None) => { + bail!("Attempt to access management api with tenant scope. Permission denied") + } + (Scope::Tenant, Some(tenantid)) => { + if claims.tenant_id.unwrap() != tenantid { + bail!("Tenant id mismatch. Permission denied") + } + Ok(()) + } + (Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope + (Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope + } + } } impl postgres_backend::Handler for PageServerHandler { + fn check_auth_jwt( + &mut self, + _pgb: &mut PostgresBackend, + jwt_response: &[u8], + ) -> anyhow::Result<()> { + // this unwrap is never triggered, because check_auth_jwt only called when auth_type is ZenithJWT + // which requires auth to be present + let data = self + .auth + .as_ref() + .as_ref() + .unwrap() + .decode(&str::from_utf8(jwt_response)?)?; + + if matches!(data.claims.scope, Scope::Tenant) { + ensure!( + data.claims.tenant_id.is_some(), + "jwt token scope is Tenant, but tenant id is missing" + ) + } + + info!( + "jwt auth succeeded for scope: {:#?} by tenantid: {:?}", + data.claims.scope, data.claims.tenant_id, + ); + + self.claims = Some(data.claims); + Ok(()) + } + fn process_query( &mut self, pgb: &mut PostgresBackend, @@ -384,6 +462,8 @@ impl postgres_backend::Handler for PageServerHandler { let tenantid = ZTenantId::from_str(params[0])?; let timelineid = ZTimelineId::from_str(params[1])?; + self.check_permission(Some(tenantid))?; + self.handle_pagerequests(pgb, timelineid, tenantid)?; } else if query_string.starts_with("basebackup ") { let (_, params_raw) = query_string.split_at("basebackup ".len()); @@ -396,6 +476,8 @@ impl postgres_backend::Handler for PageServerHandler { let tenantid = ZTenantId::from_str(params[0])?; let timelineid = ZTimelineId::from_str(params[1])?; + self.check_permission(Some(tenantid))?; + // TODO are there any tests with lsn option? let lsn = if params.len() == 3 { Some(Lsn::from_str(params[2])?) @@ -422,6 +504,8 @@ impl postgres_backend::Handler for PageServerHandler { let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let connstr = caps.get(3).unwrap().as_str().to_owned(); + self.check_permission(Some(tenantid))?; + // Check that the timeline exists let repository = page_cache::get_repository_for_tenant(&tenantid)?; if repository.get_timeline(timelineid).is_err() { @@ -445,6 +529,8 @@ impl postgres_backend::Handler for PageServerHandler { let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned(); let startpoint_str = caps.get(3).ok_or_else(err)?.as_str().to_owned(); + self.check_permission(Some(tenantid))?; + let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str, &tenantid)?; let branch = serde_json::to_vec(&branch)?; @@ -463,6 +549,8 @@ impl postgres_backend::Handler for PageServerHandler { let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + self.check_permission(Some(tenantid))?; + let start_lsn = Lsn(0); // TODO this needs to come from the repo let timeline = page_cache::get_repository_for_tenant(&tenantid)? .create_empty_timeline(timelineid, start_lsn)?; @@ -505,6 +593,8 @@ impl postgres_backend::Handler for PageServerHandler { let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let postgres_connection_uri = caps.get(3).unwrap().as_str(); + self.check_permission(Some(tenantid))?; + let timeline = page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; @@ -551,6 +641,8 @@ impl postgres_backend::Handler for PageServerHandler { let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap(); let caps = re.captures(&query_string).ok_or_else(err)?; + self.check_permission(None)?; + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; page_cache::create_repository_for_tenant(&self.conf, tenantid)?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 68ad07fdd8..167fdc51af 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,5 @@ use crate::object_key::*; use crate::relish::*; -use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::nonrelfile_utils::transaction_id_get_status; @@ -12,6 +11,7 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTimelineId; /// /// A repository corresponds to one .zenith directory. One repository holds multiple @@ -258,13 +258,15 @@ mod tests { use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; - use crate::{PageServerConf, ZTenantId}; + use crate::PageServerConf; use postgres_ffi::pg_constants; use std::fs; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; use zenith_utils::bin_ser::BeSer; + use zenith_utils::postgres_backend::AuthType; + use zenith_utils::zid::ZTenantId; /// Arbitrary relation tag, for testing. const TESTREL_A: RelishTag = RelishTag::Relation(RelTag { @@ -304,6 +306,8 @@ mod tests { superuser: "zenith_admin".to_string(), workdir: repo_dir, pg_distrib_dir: "".into(), + auth_type: AuthType::Trust, + auth_validation_public_key_path: None, }; // Make a static copy of the config. This can never be free'd, but that's // OK in a test. diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index 92c36f8567..41ed86d76f 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -5,14 +5,14 @@ use crate::object_key::*; use crate::object_store::ObjectStore; use crate::relish::*; use crate::PageServerConf; -use crate::ZTenantId; -use crate::ZTimelineId; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::sync::{Arc, Mutex}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; #[derive(Debug, Clone, Serialize, Deserialize)] struct StorageKey { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 4da99d0918..8ae690a251 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -9,8 +9,6 @@ use crate::relish::*; use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; -use crate::ZTenantId; -use crate::ZTimelineId; use anyhow::{Error, Result}; use lazy_static::lazy_static; use log::*; @@ -32,6 +30,8 @@ use std::thread; use std::thread::sleep; use std::time::{Duration, SystemTime}; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; // // We keep one WAL Receiver active per timeline. diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index dd06266a12..a2acd84f0d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,13 +36,13 @@ use tokio::process::{ChildStdin, ChildStdout, Command}; use tokio::time::timeout; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantId; use crate::relish::*; use crate::repository::WALRecord; use crate::waldecoder::XlXactParsedRecord; use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; -use crate::ZTenantId; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; diff --git a/test_runner/Pipfile b/test_runner/Pipfile index d115ccbcda..416e3c7eed 100644 --- a/test_runner/Pipfile +++ b/test_runner/Pipfile @@ -7,6 +7,7 @@ name = "pypi" pytest = ">=6.0.0" psycopg2 = "*" typing-extensions = "*" +pyjwt = {extras = ["crypto"], version = "*"} [dev-packages] yapf = "*" diff --git a/test_runner/Pipfile.lock b/test_runner/Pipfile.lock index d3a7ea5de2..aa8ad3bae0 100644 --- a/test_runner/Pipfile.lock +++ b/test_runner/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "4c20c05c20c50cf7e8f78ab461ab23841125345e63e00e2efa7661c165b6b364" + "sha256": "f60a966726bcc19670402ad3fa57396b5dacf0a027544418ceb7cc0d42d94a52" }, "pipfile-spec": 6, "requires": { @@ -24,13 +24,72 @@ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==21.2.0" }, - "importlib-metadata": { + "cffi": { "hashes": [ - "sha256:833b26fb89d5de469b24a390e9df088d4e52e4ba33b01dc5e0e4f41b81a16c00", - "sha256:b142cc1dd1342f31ff04bb7d022492b09920cb64fed867cd3ea6f80fe3ebd139" + "sha256:06c54a68935738d206570b20da5ef2b6b6d92b38ef3ec45c5422c0ebaf338d4d", + "sha256:0c0591bee64e438883b0c92a7bed78f6290d40bf02e54c5bf0978eaf36061771", + "sha256:19ca0dbdeda3b2615421d54bef8985f72af6e0c47082a8d26122adac81a95872", + "sha256:22b9c3c320171c108e903d61a3723b51e37aaa8c81255b5e7ce102775bd01e2c", + "sha256:26bb2549b72708c833f5abe62b756176022a7b9a7f689b571e74c8478ead51dc", + "sha256:33791e8a2dc2953f28b8d8d300dde42dd929ac28f974c4b4c6272cb2955cb762", + "sha256:3c8d896becff2fa653dc4438b54a5a25a971d1f4110b32bd3068db3722c80202", + "sha256:4373612d59c404baeb7cbd788a18b2b2a8331abcc84c3ba40051fcd18b17a4d5", + "sha256:487d63e1454627c8e47dd230025780e91869cfba4c753a74fda196a1f6ad6548", + "sha256:48916e459c54c4a70e52745639f1db524542140433599e13911b2f329834276a", + "sha256:4922cd707b25e623b902c86188aca466d3620892db76c0bdd7b99a3d5e61d35f", + "sha256:55af55e32ae468e9946f741a5d51f9896da6b9bf0bbdd326843fec05c730eb20", + "sha256:57e555a9feb4a8460415f1aac331a2dc833b1115284f7ded7278b54afc5bd218", + "sha256:5d4b68e216fc65e9fe4f524c177b54964af043dde734807586cf5435af84045c", + "sha256:64fda793737bc4037521d4899be780534b9aea552eb673b9833b01f945904c2e", + "sha256:6d6169cb3c6c2ad50db5b868db6491a790300ade1ed5d1da29289d73bbe40b56", + "sha256:7bcac9a2b4fdbed2c16fa5681356d7121ecabf041f18d97ed5b8e0dd38a80224", + "sha256:80b06212075346b5546b0417b9f2bf467fea3bfe7352f781ffc05a8ab24ba14a", + "sha256:818014c754cd3dba7229c0f5884396264d51ffb87ec86e927ef0be140bfdb0d2", + "sha256:8eb687582ed7cd8c4bdbff3df6c0da443eb89c3c72e6e5dcdd9c81729712791a", + "sha256:99f27fefe34c37ba9875f224a8f36e31d744d8083e00f520f133cab79ad5e819", + "sha256:9f3e33c28cd39d1b655ed1ba7247133b6f7fc16fa16887b120c0c670e35ce346", + "sha256:a8661b2ce9694ca01c529bfa204dbb144b275a31685a075ce123f12331be790b", + "sha256:a9da7010cec5a12193d1af9872a00888f396aba3dc79186604a09ea3ee7c029e", + "sha256:aedb15f0a5a5949ecb129a82b72b19df97bbbca024081ed2ef88bd5c0a610534", + "sha256:b315d709717a99f4b27b59b021e6207c64620790ca3e0bde636a6c7f14618abb", + "sha256:ba6f2b3f452e150945d58f4badd92310449876c4c954836cfb1803bdd7b422f0", + "sha256:c33d18eb6e6bc36f09d793c0dc58b0211fccc6ae5149b808da4a62660678b156", + "sha256:c9a875ce9d7fe32887784274dd533c57909b7b1dcadcc128a2ac21331a9765dd", + "sha256:c9e005e9bd57bc987764c32a1bee4364c44fdc11a3cc20a40b93b444984f2b87", + "sha256:d2ad4d668a5c0645d281dcd17aff2be3212bc109b33814bbb15c4939f44181cc", + "sha256:d950695ae4381ecd856bcaf2b1e866720e4ab9a1498cba61c602e56630ca7195", + "sha256:e22dcb48709fc51a7b58a927391b23ab37eb3737a98ac4338e2448bef8559b33", + "sha256:e8c6a99be100371dbb046880e7a282152aa5d6127ae01783e37662ef73850d8f", + "sha256:e9dc245e3ac69c92ee4c167fbdd7428ec1956d4e754223124991ef29eb57a09d", + "sha256:eb687a11f0a7a1839719edd80f41e459cc5366857ecbed383ff376c4e3cc6afd", + "sha256:eb9e2a346c5238a30a746893f23a9535e700f8192a68c07c0258e7ece6ff3728", + "sha256:ed38b924ce794e505647f7c331b22a693bee1538fdf46b0222c4717b42f744e7", + "sha256:f0010c6f9d1a4011e429109fda55a225921e3206e7f62a0c22a35344bfd13cca", + "sha256:f0c5d1acbfca6ebdd6b1e3eded8d261affb6ddcf2186205518f1428b8569bb99", + "sha256:f10afb1004f102c7868ebfe91c28f4a712227fe4cb24974350ace1f90e1febbf", + "sha256:f174135f5609428cc6e1b9090f9268f5c8935fddb1b25ccb8255a2d50de6789e", + "sha256:f3ebe6e73c319340830a9b2825d32eb6d8475c1dac020b4f0aa774ee3b898d1c", + "sha256:f627688813d0a4140153ff532537fbe4afea5a3dffce1f9deb7f91f848a832b5", + "sha256:fd4305f86f53dfd8cd3522269ed7fc34856a8ee3709a5e28b2836b2db9d4cd69" ], - "markers": "python_version < '3.8'", - "version": "==4.5.0" + "version": "==1.14.6" + }, + "cryptography": { + "hashes": [ + "sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d", + "sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959", + "sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6", + "sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873", + "sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2", + "sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713", + "sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1", + "sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177", + "sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250", + "sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca", + "sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d", + "sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9" + ], + "version": "==3.4.7" }, "iniconfig": { "hashes": [ @@ -41,11 +100,11 @@ }, "packaging": { "hashes": [ - "sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5", - "sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a" + "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7", + "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==20.9" + "markers": "python_version >= '3.6'", + "version": "==21.0" }, "pluggy": { "hashes": [ @@ -57,18 +116,18 @@ }, "psycopg2": { "hashes": [ - "sha256:03a485bf71498870e38b535c0e6e7162d6ac06a91487edddc3b959894d65f79c", - "sha256:22102cfeb904898254f287b1a77360bf66c636858e7476593acd5267e5c24ff9", - "sha256:8f4c1800e57ad128d20b2e91d222ca238fffd316cef65be781361cdf35e37979", - "sha256:b12073fdf2002e828e5921be2c39ff9c6eab361c5c0bd6c529619fc23677accc", - "sha256:b6f47af317af8110818d255e693cfa80b7f1e435285be09778db7b66efd95789", - "sha256:d549db98fc0e6db41a2aa0d65f7434c4308a9f64012adb209b9e489f26fe87c6", - "sha256:e44e39a46af7c30566b7667fb27e701e652ab0a51e05c263a01d3ff0e223b765", - "sha256:e84c80be7a238d3c9c099b71f6890eaa35fc881146232cce888a88ab1bfb431e", - "sha256:f3d42bd42302293767b84206d9a446abc67ed4a133e4fe04dad8952de06c2091" + "sha256:079d97fc22de90da1d370c90583659a9f9a6ee4007355f5825e5f1c70dffc1fa", + "sha256:2087013c159a73e09713294a44d0c8008204d06326006b7f652bef5ace66eebb", + "sha256:2c992196719fadda59f72d44603ee1a2fdcc67de097eea38d41c7ad9ad246e62", + "sha256:7640e1e4d72444ef012e275e7b53204d7fab341fb22bc76057ede22fe6860b25", + "sha256:7f91312f065df517187134cce8e395ab37f5b601a42446bdc0f0d51773621854", + "sha256:830c8e8dddab6b6716a4bf73a09910c7954a92f40cf1d1e702fb93c8a919cc56", + "sha256:89409d369f4882c47f7ea20c42c5046879ce22c1e4ea20ef3b00a4dfc0a7f188", + "sha256:bf35a25f1aaa8a3781195595577fcbb59934856ee46b4f252f56ad12b8043bcf", + "sha256:de5303a6f1d0a7a34b9d40e4d3bef684ccc44a49bbe3eb85e3c0bffb4a131b7c" ], "index": "pypi", - "version": "==2.9" + "version": "==2.9.1" }, "py": { "hashes": [ @@ -78,6 +137,25 @@ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.10.0" }, + "pycparser": { + "hashes": [ + "sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0", + "sha256:7582ad22678f0fcd81102833f60ef8d0e57288b6b5fb00323d101be910e35705" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==2.20" + }, + "pyjwt": { + "extras": [ + "crypto" + ], + "hashes": [ + "sha256:934d73fbba91b0483d3857d1aff50e96b2a892384ee2c17417ed3203f173fca1", + "sha256:fba44e7898bbca160a2b2b501f492824fc8382485d3a6f11ba5d0c1937ce6130" + ], + "index": "pypi", + "version": "==2.1.0" + }, "pyparsing": { "hashes": [ "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", @@ -110,14 +188,6 @@ ], "index": "pypi", "version": "==3.10.0.0" - }, - "zipp": { - "hashes": [ - "sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76", - "sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098" - ], - "markers": "python_version >= '3.6'", - "version": "==3.4.1" } }, "develop": { @@ -129,14 +199,6 @@ "index": "pypi", "version": "==3.9.2" }, - "importlib-metadata": { - "hashes": [ - "sha256:833b26fb89d5de469b24a390e9df088d4e52e4ba33b01dc5e0e4f41b81a16c00", - "sha256:b142cc1dd1342f31ff04bb7d022492b09920cb64fed867cd3ea6f80fe3ebd139" - ], - "markers": "python_version < '3.8'", - "version": "==4.5.0" - }, "mccabe": { "hashes": [ "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42", @@ -146,32 +208,32 @@ }, "mypy": { "hashes": [ - "sha256:0190fb77e93ce971954c9e54ea61de2802065174e5e990c9d4c1d0f54fbeeca2", - "sha256:0756529da2dd4d53d26096b7969ce0a47997123261a5432b48cc6848a2cb0bd4", - "sha256:2f9fedc1f186697fda191e634ac1d02f03d4c260212ccb018fabbb6d4b03eee8", - "sha256:353aac2ce41ddeaf7599f1c73fed2b75750bef3b44b6ad12985a991bc002a0da", - "sha256:3f12705eabdd274b98f676e3e5a89f247ea86dc1af48a2d5a2b080abac4e1243", - "sha256:4efc67b9b3e2fddbe395700f91d5b8deb5980bfaaccb77b306310bd0b9e002eb", - "sha256:517e7528d1be7e187a5db7f0a3e479747307c1b897d9706b1c662014faba3116", - "sha256:68a098c104ae2b75e946b107ef69dd8398d54cb52ad57580dfb9fc78f7f997f0", - "sha256:746e0b0101b8efec34902810047f26a8c80e1efbb4fc554956d848c05ef85d76", - "sha256:8be7bbd091886bde9fcafed8dd089a766fa76eb223135fe5c9e9798f78023a20", - "sha256:9236c21194fde5df1b4d8ebc2ef2c1f2a5dc7f18bcbea54274937cae2e20a01c", - "sha256:9ef5355eaaf7a23ab157c21a44c614365238a7bdb3552ec3b80c393697d974e1", - "sha256:9f1d74eeb3f58c7bd3f3f92b8f63cb1678466a55e2c4612bf36909105d0724ab", - "sha256:a26d0e53e90815c765f91966442775cf03b8a7514a4e960de7b5320208b07269", - "sha256:ae94c31bb556ddb2310e4f913b706696ccbd43c62d3331cd3511caef466871d2", - "sha256:b5ba1f0d5f9087e03bf5958c28d421a03a4c1ad260bf81556195dffeccd979c4", - "sha256:b5dfcd22c6bab08dfeded8d5b44bdcb68c6f1ab261861e35c470b89074f78a70", - "sha256:cd01c599cf9f897b6b6c6b5d8b182557fb7d99326bcdf5d449a0fbbb4ccee4b9", - "sha256:e89880168c67cf4fde4506b80ee42f1537ad66ad366c101d388b3fd7d7ce2afd", - "sha256:ebe2bc9cb638475f5d39068d2dbe8ae1d605bb8d8d3ff281c695df1670ab3987", - "sha256:f89bfda7f0f66b789792ab64ce0978e4a991a0e4dd6197349d0767b0f1095b21", - "sha256:fc4d63da57ef0e8cd4ab45131f3fe5c286ce7dd7f032650d0fbc239c6190e167", - "sha256:fd634bc17b1e2d6ce716f0e43446d0d61cdadb1efcad5c56ca211c22b246ebc8" + "sha256:088cd9c7904b4ad80bec811053272986611b84221835e079be5bcad029e79dd9", + "sha256:0aadfb2d3935988ec3815952e44058a3100499f5be5b28c34ac9d79f002a4a9a", + "sha256:119bed3832d961f3a880787bf621634ba042cb8dc850a7429f643508eeac97b9", + "sha256:1a85e280d4d217150ce8cb1a6dddffd14e753a4e0c3cf90baabb32cefa41b59e", + "sha256:3c4b8ca36877fc75339253721f69603a9c7fdb5d4d5a95a1a1b899d8b86a4de2", + "sha256:3e382b29f8e0ccf19a2df2b29a167591245df90c0b5a2542249873b5c1d78212", + "sha256:42c266ced41b65ed40a282c575705325fa7991af370036d3f134518336636f5b", + "sha256:53fd2eb27a8ee2892614370896956af2ff61254c275aaee4c230ae771cadd885", + "sha256:704098302473cb31a218f1775a873b376b30b4c18229421e9e9dc8916fd16150", + "sha256:7df1ead20c81371ccd6091fa3e2878559b5c4d4caadaf1a484cf88d93ca06703", + "sha256:866c41f28cee548475f146aa4d39a51cf3b6a84246969f3759cb3e9c742fc072", + "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457", + "sha256:adaeee09bfde366d2c13fe6093a7df5df83c9a2ba98638c7d76b010694db760e", + "sha256:b6fb13123aeef4a3abbcfd7e71773ff3ff1526a7d3dc538f3929a49b42be03f0", + "sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb", + "sha256:c0df2d30ed496a08de5daed2a9ea807d07c21ae0ab23acf541ab88c24b26ab97", + "sha256:c6c2602dffb74867498f86e6129fd52a2770c48b7cd3ece77ada4fa38f94eba8", + "sha256:ceb6e0a6e27fb364fb3853389607cf7eb3a126ad335790fa1e14ed02fba50811", + "sha256:d9dd839eb0dc1bbe866a288ba3c1afc33a202015d2ad83b31e875b5905a079b6", + "sha256:e4dab234478e3bd3ce83bac4193b2ecd9cf94e720ddd95ce69840273bf44f6de", + "sha256:ec4e0cd079db280b6bdabdc807047ff3e199f334050db5cbb91ba3e959a67504", + "sha256:ecd2c3fe726758037234c93df7e98deb257fd15c24c9180dacf1ef829da5f921", + "sha256:ef565033fa5a958e62796867b1df10c40263ea9ded87164d67572834e57a174d" ], "index": "pypi", - "version": "==0.902" + "version": "==0.910" }, "mypy-extensions": { "hashes": [ @@ -204,42 +266,6 @@ "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==0.10.2" }, - "typed-ast": { - "hashes": [ - "sha256:01ae5f73431d21eead5015997ab41afa53aa1fbe252f9da060be5dad2c730ace", - "sha256:067a74454df670dcaa4e59349a2e5c81e567d8d65458d480a5b3dfecec08c5ff", - "sha256:0fb71b8c643187d7492c1f8352f2c15b4c4af3f6338f21681d3681b3dc31a266", - "sha256:1b3ead4a96c9101bef08f9f7d1217c096f31667617b58de957f690c92378b528", - "sha256:2068531575a125b87a41802130fa7e29f26c09a2833fea68d9a40cf33902eba6", - "sha256:209596a4ec71d990d71d5e0d312ac935d86930e6eecff6ccc7007fe54d703808", - "sha256:2c726c276d09fc5c414693a2de063f521052d9ea7c240ce553316f70656c84d4", - "sha256:398e44cd480f4d2b7ee8d98385ca104e35c81525dd98c519acff1b79bdaac363", - "sha256:52b1eb8c83f178ab787f3a4283f68258525f8d70f778a2f6dd54d3b5e5fb4341", - "sha256:5feca99c17af94057417d744607b82dd0a664fd5e4ca98061480fd8b14b18d04", - "sha256:7538e495704e2ccda9b234b82423a4038f324f3a10c43bc088a1636180f11a41", - "sha256:760ad187b1041a154f0e4d0f6aae3e40fdb51d6de16e5c99aedadd9246450e9e", - "sha256:777a26c84bea6cd934422ac2e3b78863a37017618b6e5c08f92ef69853e765d3", - "sha256:95431a26309a21874005845c21118c83991c63ea800dd44843e42a916aec5899", - "sha256:9ad2c92ec681e02baf81fdfa056fe0d818645efa9af1f1cd5fd6f1bd2bdfd805", - "sha256:9c6d1a54552b5330bc657b7ef0eae25d00ba7ffe85d9ea8ae6540d2197a3788c", - "sha256:aee0c1256be6c07bd3e1263ff920c325b59849dc95392a05f258bb9b259cf39c", - "sha256:af3d4a73793725138d6b334d9d247ce7e5f084d96284ed23f22ee626a7b88e39", - "sha256:b36b4f3920103a25e1d5d024d155c504080959582b928e91cb608a65c3a49e1a", - "sha256:b9574c6f03f685070d859e75c7f9eeca02d6933273b5e69572e5ff9d5e3931c3", - "sha256:bff6ad71c81b3bba8fa35f0f1921fb24ff4476235a6e94a26ada2e54370e6da7", - "sha256:c190f0899e9f9f8b6b7863debfb739abcb21a5c054f911ca3596d12b8a4c4c7f", - "sha256:c907f561b1e83e93fad565bac5ba9c22d96a54e7ea0267c708bffe863cbe4075", - "sha256:cae53c389825d3b46fb37538441f75d6aecc4174f615d048321b716df2757fb0", - "sha256:dd4a21253f42b8d2b48410cb31fe501d32f8b9fbeb1f55063ad102fe9c425e40", - "sha256:dde816ca9dac1d9c01dd504ea5967821606f02e510438120091b84e852367428", - "sha256:f2362f3cb0f3172c42938946dbc5b7843c2a28aec307c49100c8b38764eb6927", - "sha256:f328adcfebed9f11301eaedfa48e15bdece9b519fb27e6a8c01aa52a17ec31b3", - "sha256:f8afcf15cc511ada719a88e013cec87c11aff7b91f019295eb4530f96fe5ef2f", - "sha256:fb1bbeac803adea29cedd70781399c99138358c26d05fcbd23c13016b7f5ec65" - ], - "markers": "python_version < '3.8'", - "version": "==1.4.3" - }, "typing-extensions": { "hashes": [ "sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497", @@ -256,14 +282,6 @@ ], "index": "pypi", "version": "==0.31.0" - }, - "zipp": { - "hashes": [ - "sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76", - "sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098" - ], - "markers": "python_version >= '3.6'", - "version": "==3.4.1" } } } diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py new file mode 100644 index 0000000000..062e54c3be --- /dev/null +++ b/test_runner/batch_others/test_auth.py @@ -0,0 +1,98 @@ + +from contextlib import closing +from pathlib import Path +from uuid import uuid4 +from dataclasses import dataclass +import jwt +import psycopg2 +from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver +import pytest + + +@pytest.fixture +def pageserver_auth_enabled(zenith_cli: ZenithCli): + with ZenithPageserver(zenith_cli).init(enable_auth=True).start() as ps: + # For convenience in tests, create a branch from the freshly-initialized cluster. + zenith_cli.run(["branch", "empty", "main"]) + yield ps + + +@dataclass +class AuthKeys: + pub: bytes + priv: bytes + + +@pytest.fixture +def auth_keys(repo_dir: str) -> AuthKeys: + # TODO probably this should be specified in cli config and used in tests for single source of truth + pub = (Path(repo_dir) / 'auth_public_key.pem').read_bytes() + priv = (Path(repo_dir) / 'auth_private_key.pem').read_bytes() + return AuthKeys(pub=pub, priv=priv) + + +def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver, auth_keys: AuthKeys): + ps = pageserver_auth_enabled + + tenant_token = jwt.encode({"scope": "tenant", "tenant_id": ps.initial_tenant}, auth_keys.priv, algorithm="RS256") + invalid_tenant_token = jwt.encode({"scope": "tenant", "tenant_id": uuid4().hex}, auth_keys.priv, algorithm="RS256") + management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256") + + # this does not invoke auth check and only decodes jwt and checks it for validity + # check both tokens + ps.safe_psql("status", password=tenant_token) + ps.safe_psql("status", password=management_token) + + # tenant can create branches + ps.safe_psql(f"branch_create {ps.initial_tenant} new1 main", password=tenant_token) + # console can create branches for tenant + ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=management_token) + + # fail to create branch using token with different tenantid + with pytest.raises(psycopg2.DatabaseError, match='Tenant id mismatch. Permission denied'): + ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=invalid_tenant_token) + + # create tenant using management token + ps.safe_psql(f"tenant_create {uuid4().hex}", password=management_token) + + # fail to create tenant using tenant token + with pytest.raises(psycopg2.DatabaseError, match='Attempt to access management api with tenant scope. Permission denied'): + ps.safe_psql(f"tenant_create {uuid4().hex}", password=tenant_token) + + +@pytest.mark.parametrize('with_wal_acceptors', [False, True]) +def test_compute_auth_to_pageserver( + zenith_cli: ZenithCli, + wa_factory, + pageserver_auth_enabled: ZenithPageserver, + repo_dir: str, + with_wal_acceptors: bool, + auth_keys: AuthKeys, +): + ps = pageserver_auth_enabled + # since we are in progress of refactoring protocols between compute safekeeper and page server + # use hardcoded management token in safekeeper + management_token = jwt.encode({"scope": "pageserverapi"}, auth_keys.priv, algorithm="RS256") + + branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}" + zenith_cli.run(["branch", branch, "empty"]) + if with_wal_acceptors: + wa_factory.start_n_new(3, management_token) + + with Postgres( + zenith_cli=zenith_cli, + repo_dir=repo_dir, + tenant_id=ps.initial_tenant, + port=55432, # FIXME port distribution is hardcoded in tests and in cli + ).create_start( + branch, + wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, + ) as pg: + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (5000050000, ) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 076f5a3ff5..e76debcea1 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -87,23 +87,30 @@ class PgProtocol: self.port = port self.username = username or "zenith_admin" - def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str: + def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, password: Optional[str] = None) -> str: """ Build a libpq connection string for the Postgres instance. """ username = username or self.username - return f'host={self.host} port={self.port} user={username} dbname={dbname}' + res = f'host={self.host} port={self.port} user={username} dbname={dbname}' + if not password: + return res + return f'{res} password={password}' # autocommit=True here by default because that's what we need most of the time - def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection: + def connect(self, *, autocommit=True, dbname: str = 'postgres', username: Optional[str] = None, password: Optional[str] = None) -> PgConnection: """ Connect to the node. Returns psycopg2's connection object. This method passes all extra params to connstr. """ - conn = psycopg2.connect(self.connstr(**kwargs)) + conn = psycopg2.connect(self.connstr( + dbname=dbname, + username=username, + password=password, + )) # WARNING: this setting affects *all* tests! conn.autocommit = autocommit return conn @@ -175,12 +182,15 @@ class ZenithPageserver(PgProtocol): self.running = False self.initial_tenant = None - def init(self) -> 'ZenithPageserver': + def init(self, enable_auth: bool = False) -> 'ZenithPageserver': """ Initialize the repository, i.e. run "zenith init". Returns self. """ - self.zenith_cli.run(['init']) + cmd = ['init'] + if enable_auth: + cmd.append('--enable-auth') + self.zenith_cli.run(cmd) return self def start(self) -> 'ZenithPageserver': @@ -207,6 +217,12 @@ class ZenithPageserver(PgProtocol): return self + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.stop() + @zenfixture def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: @@ -235,11 +251,10 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int, tenant_id: str): - super().__init__(host='localhost', port=55431 + instance_num) + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int): + super().__init__(host='localhost', port=port) self.zenith_cli = zenith_cli - self.instance_num = instance_num self.running = False self.repo_dir = repo_dir self.branch: Optional[str] = None # dubious, see asserts below @@ -373,15 +388,22 @@ class Postgres(PgProtocol): return self + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.stop() + class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] self.initial_tenant: str = initial_tenant + self.base_port = base_port def create_start( self, @@ -391,7 +413,13 @@ class PostgresFactory: config_lines: Optional[List[str]] = None ) -> Postgres: - pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1, tenant_id=tenant_id or self.initial_tenant) + pg = Postgres( + zenith_cli=self.zenith_cli, + repo_dir=self.repo_dir, + tenant_id=tenant_id or self.initial_tenant, + port=self.base_port + self.num_instances + 1, + ) + self.num_instances += 1 self.instances.append(pg) @@ -490,11 +518,12 @@ def read_pid(path): class WalAcceptor: """ An object representing a running wal acceptor daemon. """ - def __init__(self, wa_binpath, data_dir, port, num): + def __init__(self, wa_binpath, data_dir, port, num, auth_token: Optional[str] = None): self.wa_binpath = wa_binpath self.data_dir = data_dir self.port = port self.num = num # identifier for logging + self.auth_token = auth_token def start(self) -> 'WalAcceptor': # create data directory if not exists @@ -509,7 +538,8 @@ class WalAcceptor: cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PORT)]) cmd.extend(["--recall", "1 second"]) print('Running command "{}"'.format(' '.join(cmd))) - subprocess.run(cmd, check=True) + env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None + subprocess.run(cmd, check=True, env=env) return self @@ -545,26 +575,30 @@ class WalAcceptorFactory: self.instances = [] self.initial_port = 54321 - def start_new(self) -> WalAcceptor: + def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor: """ Start new wal acceptor. """ wa_num = len(self.instances) - wa = WalAcceptor(self.wa_binpath, - os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)), - self.initial_port + wa_num, wa_num) + wa = WalAcceptor( + self.wa_binpath, + os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)), + self.initial_port + wa_num, + wa_num, + auth_token, + ) wa.start() self.instances.append(wa) return wa - def start_n_new(self, n: int) -> None: + def start_n_new(self, n: int, auth_token: Optional[str] = None) -> None: """ Start n new wal acceptors. """ for _ in range(n): - self.start_new() + self.start_new(auth_token) def stop_all(self) -> 'WalAcceptorFactory': for wa in self.instances: diff --git a/vendor/postgres b/vendor/postgres index 2d6ef4b320..59ad940fcb 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 2d6ef4b32019f70732bffbf8a5a2c93220872b7e +Subproject commit 59ad940fcbd7a7b26de3957cfc8f34b3f95a8fe6 diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 9594de4308..cf7cea19cc 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -6,9 +6,9 @@ use clap::{App, Arg}; use daemonize::Daemonize; use log::*; use slog::Drain; -use std::io; use std::path::{Path, PathBuf}; use std::thread; +use std::{env, io}; use std::{fs::File, fs::OpenOptions}; use walkeeper::s3_offload; @@ -74,6 +74,7 @@ fn main() -> Result<()> { listen_addr: "localhost:5454".to_string(), ttl: None, recall_period: None, + pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), }; if let Some(dir) = arg_matches.value_of("datadir") { diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 47a5ecaa0c..399669130b 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -16,6 +16,8 @@ pub struct WalAcceptorConf { pub no_sync: bool, pub listen_addr: String, pub pageserver_addr: Option, + // TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework + pub pageserver_auth_token: Option, pub ttl: Option, pub recall_period: Option, } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 5a701dba17..e6eb9cb5c6 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -17,11 +17,11 @@ use std::thread::sleep; use zenith_utils::bin_ser::LeSer; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::HotStandbyFeedback; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use pageserver::{ZTenantId, ZTimelineId}; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; use zenith_utils::pq_proto::SystemId; @@ -153,7 +153,11 @@ pub struct ReceiveWalConn { /// fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) { let ps_addr = conf.pageserver_addr.unwrap(); - let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr); + let ps_connstr = format!( + "postgresql://no_user:{}@{}/no_db", + &conf.pageserver_auth_token.unwrap_or_default(), + ps_addr + ); // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 711be4b9ca..e10d9bdf16 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -7,12 +7,12 @@ use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use anyhow::{bail, Result}; use bytes::Bytes; -use pageserver::ZTimelineId; use std::str::FromStr; use std::sync::Arc; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor}; +use zenith_utils::zid::ZTimelineId; /// Handler for streaming WAL from acceptor pub struct SendWalHandler { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 3a2ff235cd..6aaabeaa78 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -5,7 +5,6 @@ use anyhow::{bail, Result}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; -use pageserver::ZTimelineId; use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID}; use std::cmp::{max, min}; use std::collections::HashMap; @@ -15,6 +14,7 @@ use std::path::Path; use std::sync::{Arc, Condvar, Mutex}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTimelineId; use crate::receive_wal::{SafeKeeperInfo, CONTROL_FILE_NAME, SK_FORMAT_VERSION, SK_MAGIC}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml index d587a48881..6b2f54582d 100644 --- a/zenith/Cargo.toml +++ b/zenith/Cargo.toml @@ -12,7 +12,7 @@ anyhow = "1.0" serde_json = "1" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -# FIXME: 'pageserver' is needed for ZTimelineId. Refactor +# FIXME: 'pageserver' is needed for BranchInfo. Refactor pageserver = { path = "../pageserver" } control_plane = { path = "../control_plane" } postgres_ffi = { path = "../postgres_ffi" } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 4c9c019893..e3f6464ef0 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -4,13 +4,15 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env::{self, LocalEnv}; use control_plane::storage::PageServerNode; -use pageserver::ZTenantId; use std::collections::btree_map::Entry; use std::collections::HashMap; use std::process::exit; use std::str::FromStr; +use zenith_utils::auth::{encode_from_key_path, Claims, Scope}; +use zenith_utils::postgres_backend::AuthType; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use pageserver::{branches::BranchInfo, ZTimelineId}; +use pageserver::branches::BranchInfo; use zenith_utils::lsn::Lsn; /// @@ -53,6 +55,12 @@ fn main() -> Result<()> { .long("remote-pageserver") .required(false) .value_name("pageserver-url"), + ) + .arg( + Arg::with_name("enable-auth") + .long("enable-auth") + .takes_value(false) + .help("Enable authentication using ZenithJWT") ), ) .subcommand( @@ -115,10 +123,16 @@ fn main() -> Result<()> { .get_matches(); // Create config file - if let ("init", Some(sub_args)) = matches.subcommand() { + if let ("init", Some(init_match)) = matches.subcommand() { let tenantid = ZTenantId::generate(); - let pageserver_uri = sub_args.value_of("pageserver-url"); - local_env::init(pageserver_uri, tenantid) + let pageserver_uri = init_match.value_of("pageserver-url"); + let auth_type = if init_match.is_present("enable-auth") { + AuthType::ZenithJWT + } else { + AuthType::Trust + }; + + local_env::init(pageserver_uri, tenantid, auth_type) .with_context(|| "Failed to create config file")?; } @@ -132,9 +146,12 @@ fn main() -> Result<()> { }; match matches.subcommand() { - ("init", Some(_)) => { + ("init", Some(init_match)) => { let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.init(Some(&env.tenantid.to_string())) { + if let Err(e) = pageserver.init( + Some(&env.tenantid.to_string()), + init_match.is_present("enable-auth"), + ) { eprintln!("pageserver init failed: {}", e); exit(1); } @@ -447,12 +464,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned())); + let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) { + let claims = Claims::new(Some(tenantid), Scope::Tenant); + Some(encode_from_key_path(&claims, &env.private_key_path)?) + } else { + None + }; + println!("Starting postgres on timeline {}...", timeline_name); if let Some(node) = node { - node.start()?; + node.start(&auth_token)?; } else { let node = cplane.new_node(tenantid, timeline_name)?; - node.start()?; + node.start(&auth_token)?; } } ("stop", Some(stop_match)) => { diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 01b057f78d..9783ced88d 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -20,6 +20,8 @@ tokio = { version = "1.5.0", features = ["full"] } zenith_metrics = { path = "../zenith_metrics" } workspace_hack = { path = "../workspace_hack" } rand = "0.8.3" +jsonwebtoken = "7" +hex = { version = "0.4.3", features = ["serde"] } [dev-dependencies] hex-literal = "0.3" diff --git a/zenith_utils/src/auth.rs b/zenith_utils/src/auth.rs new file mode 100644 index 0000000000..06f81fd29c --- /dev/null +++ b/zenith_utils/src/auth.rs @@ -0,0 +1,104 @@ +// For details about authentication see docs/authentication.md +// TODO there are two issues for our use case in jsonwebtoken library which will be resolved in next release +// The fisrt one is that there is no way to disable expiration claim, but it can be excluded from validation, so use this as a workaround for now. +// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/190 +// The second one is that we wanted to use ed25519 keys, but they are also not supported until next version. So we go with RSA keys for now. +// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/162 + +use hex::{self, FromHex}; +use serde::de::Error; +use serde::{self, Deserializer, Serializer}; +use std::{fs, path::PathBuf}; + +use anyhow::Result; +use jsonwebtoken::{ + decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, +}; +use serde::{Deserialize, Serialize}; + +use crate::zid::ZTenantId; + +const JWT_ALGORITHM: Algorithm = Algorithm::RS256; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Scope { + Tenant, + PageServerApi, +} + +pub fn to_hex_option(value: &Option, serializer: S) -> Result +where + S: Serializer, +{ + match value { + Some(tid) => hex::serialize(tid, serializer), + None => Option::serialize(value, serializer), + } +} + +fn from_hex_option<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let opt: Option = Option::deserialize(deserializer)?; + match opt { + Some(tid) => return Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)), + None => return Ok(None), + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Claims { + // this custom serialize/deserialize_with is needed because Option is not transparent to serde + // so clearest option is serde(with = "hex") but it is not working, for details see https://github.com/serde-rs/serde/issues/1301 + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "to_hex_option", + deserialize_with = "from_hex_option" + )] + pub tenant_id: Option, + pub scope: Scope, +} + +impl Claims { + pub fn new(tenant_id: Option, scope: Scope) -> Self { + Self { tenant_id, scope } + } +} + +#[derive(Debug)] +pub struct JwtAuth { + decoding_key: DecodingKey<'static>, + validation: Validation, +} + +impl JwtAuth { + pub fn new<'a>(decoding_key: DecodingKey<'a>) -> Self { + Self { + decoding_key: decoding_key.into_static(), + validation: Validation { + algorithms: vec![JWT_ALGORITHM], + validate_exp: false, + ..Default::default() + }, + } + } + + pub fn from_key_path(key_path: &PathBuf) -> Result { + let public_key = fs::read_to_string(key_path)?; + Ok(Self::new(DecodingKey::from_rsa_pem(public_key.as_bytes())?)) + } + + pub fn decode(&self, token: &str) -> Result> { + Ok(decode(token, &self.decoding_key, &self.validation)?) + } +} + +// this function is used only for testing purposes in CLI e g generate tokens during init +pub fn encode_from_key_path(claims: &Claims, key_path: &PathBuf) -> Result { + let key_data = fs::read_to_string(key_path)?; + let key = EncodingKey::from_rsa_pem(&key_data.as_bytes())?; + Ok(encode(&Header::new(JWT_ALGORITHM), claims, &key)?) +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index ca7cac91c4..46324b8e45 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -16,3 +16,9 @@ pub mod pq_proto; // dealing with connstring parsing and handy access to it's parts pub mod connstring; + +// common authentication routines +pub mod auth; + +// utility functions and helper traits for unified unique id generation/serialization etc. +pub mod zid; diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 1165acf867..8cde6fa7ce 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -4,12 +4,14 @@ //! is rather narrow, but we can extend it once required. use crate::pq_proto::{BeMessage, FeMessage, FeStartupMessage, StartupRequestCode}; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Result}; use bytes::{Bytes, BytesMut}; use log::*; use rand::Rng; +use serde::{Deserialize, Serialize}; use std::io::{self, BufReader, Write}; use std::net::{Shutdown, TcpStream}; +use std::str::FromStr; pub trait Handler { /// Handle single query. @@ -27,9 +29,14 @@ pub trait Handler { Ok(()) } - /// Check auth + /// Check auth md5 fn check_auth_md5(&mut self, _pgb: &mut PostgresBackend, _md5_response: &[u8]) -> Result<()> { - bail!("Auth failed") + bail!("MD5 auth failed") + } + + /// Check auth jwt + fn check_auth_jwt(&mut self, _pgb: &mut PostgresBackend, _jwt_response: &[u8]) -> Result<()> { + bail!("JWT auth failed") } } @@ -42,10 +49,25 @@ pub enum ProtoState { Established, } -#[derive(Clone, Copy, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum AuthType { Trust, MD5, + // This mimics postgres's AuthenticationCleartextPassword but instead of password expects JWT + ZenithJWT, +} + +impl FromStr for AuthType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "Trust" => Ok(Self::Trust), + "MD5" => Ok(Self::MD5), + "ZenithJWT" => Ok(Self::ZenithJWT), + _ => bail!("invalid value \"{}\" for auth type", s), + } + } } #[derive(Clone, Copy)] @@ -125,7 +147,7 @@ impl PostgresBackend { use ProtoState::*; match state { - ProtoState::Initialization => FeStartupMessage::read(stream), + Initialization => FeStartupMessage::read(stream), Authentication | Established => FeMessage::read(stream), } } @@ -184,13 +206,13 @@ impl PostgresBackend { // Allow only startup and password messages during auth. Otherwise client would be able to bypass auth // TODO: change that to proper top-level match of protocol state with separate message handling for each state if self.state < ProtoState::Established { - match &msg { - FeMessage::PasswordMessage(_m) => {} - FeMessage::StartupMessage(_m) => {} - _ => { - bail!("protocol violation"); - } - } + ensure!( + matches!( + msg, + FeMessage::PasswordMessage(_) | FeMessage::StartupMessage(_) + ), + "protocol violation" + ); } match msg { @@ -224,6 +246,10 @@ impl PostgresBackend { ))?; self.state = ProtoState::Authentication; } + AuthType::ZenithJWT => { + self.write_message(&BeMessage::AuthenticationCleartextPassword)?; + self.state = ProtoState::Authentication; + } } } StartupRequestCode::Cancel => { @@ -237,21 +263,35 @@ impl PostgresBackend { assert!(self.state == ProtoState::Authentication); - let (_, md5_response) = m - .split_last() - .ok_or_else(|| anyhow::Error::msg("protocol violation"))?; + match self.auth_type { + AuthType::Trust => unreachable!(), + AuthType::MD5 => { + let (_, md5_response) = m + .split_last() + .ok_or_else(|| anyhow::Error::msg("protocol violation"))?; - if let Err(e) = handler.check_auth_md5(self, md5_response) { - self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; - bail!("auth failed: {}", e); - } else { - self.write_message_noflush(&BeMessage::AuthenticationOk)?; - // psycopg2 will not connect if client_encoding is not - // specified by the server - self.write_message_noflush(&BeMessage::ParameterStatus)?; - self.write_message(&BeMessage::ReadyForQuery)?; - self.state = ProtoState::Established; + if let Err(e) = handler.check_auth_md5(self, md5_response) { + self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; + bail!("auth failed: {}", e); + } + } + AuthType::ZenithJWT => { + let (_, jwt_response) = m + .split_last() + .ok_or_else(|| anyhow::Error::msg("protocol violation"))?; + + if let Err(e) = handler.check_auth_jwt(self, jwt_response) { + self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; + bail!("auth failed: {}", e); + } + } } + self.write_message_noflush(&BeMessage::AuthenticationOk)?; + // psycopg2 will not connect if client_encoding is not + // specified by the server + self.write_message_noflush(&BeMessage::ParameterStatus)?; + self.write_message(&BeMessage::ReadyForQuery)?; + self.state = ProtoState::Established; } FeMessage::Query(m) => { diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 3a9b41859e..0d1fa9d236 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -329,6 +329,7 @@ fn read_null_terminated(buf: &mut Bytes) -> anyhow::Result { pub enum BeMessage<'a> { AuthenticationOk, AuthenticationMD5Password(&'a [u8; 4]), + AuthenticationCleartextPassword, BindComplete, CommandComplete(&'a [u8]), ControlFile, @@ -471,6 +472,15 @@ impl<'a> BeMessage<'a> { .unwrap(); // write into BytesMut can't fail } + BeMessage::AuthenticationCleartextPassword => { + buf.put_u8(b'R'); + write_body(buf, |buf| { + buf.put_i32(3); // Specifies that clear text password is required. + Ok::<_, io::Error>(()) + }) + .unwrap(); // write into BytesMut can't fail + } + BeMessage::AuthenticationMD5Password(salt) => { buf.put_u8(b'R'); write_body(buf, |buf| { diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs new file mode 100644 index 0000000000..c5b4128527 --- /dev/null +++ b/zenith_utils/src/zid.rs @@ -0,0 +1,155 @@ +use std::{fmt, str::FromStr}; + +use hex::FromHex; +use rand::Rng; +use serde::{Deserialize, Serialize}; + +// Zenith ID is a 128-bit random ID. +// Used to represent various identifiers. Provides handy utility methods and impls. +// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +struct ZId([u8; 16]); + +impl ZId { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId { + let mut arr = [0u8; 16]; + buf.copy_to_slice(&mut arr); + ZId::from(arr) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0 + } + + pub fn generate() -> Self { + let mut tli_buf = [0u8; 16]; + rand::thread_rng().fill(&mut tli_buf); + ZId::from(tli_buf) + } +} + +impl FromStr for ZId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + Self::from_hex(s) + } +} + +// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate +impl FromHex for ZId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + let mut buf: [u8; 16] = [0u8; 16]; + hex::decode_to_slice(hex, &mut buf)?; + Ok(ZId(buf)) + } +} + +impl AsRef<[u8]> for ZId { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl From<[u8; 16]> for ZId { + fn from(b: [u8; 16]) -> Self { + ZId(b) + } +} + +impl fmt::Display for ZId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + +macro_rules! zid_newtype { + ($t:ident) => { + impl $t { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> $t { + $t(ZId::get_from_buf(buf)) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0.as_arr() + } + + pub fn generate() -> Self { + $t(ZId::generate()) + } + } + + impl FromStr for $t { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result<$t, Self::Err> { + let value = ZId::from_str(s)?; + Ok($t(value)) + } + } + + impl From<[u8; 16]> for $t { + fn from(b: [u8; 16]) -> Self { + $t(ZId::from(b)) + } + } + + impl fmt::Display for $t { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } + } + }; +} + +/// Zenith timeline IDs are different from PostgreSQL timeline +/// IDs. They serve a similar purpose though: they differentiate +/// between different "histories" of the same cluster. However, +/// PostgreSQL timeline IDs are a bit cumbersome, because they are only +/// 32-bits wide, and they must be in ascending order in any given +/// timeline history. Those limitations mean that we cannot generate a +/// new PostgreSQL timeline ID by just generating a random number. And +/// that in turn is problematic for the "pull/push" workflow, where you +/// have a local copy of a zenith repository, and you periodically sync +/// the local changes with a remote server. When you work "detached" +/// from the remote server, you cannot create a PostgreSQL timeline ID +/// that's guaranteed to be different from all existing timelines in +/// the remote server. For example, if two people are having a clone of +/// the repository on their laptops, and they both create a new branch +/// with different name. What timeline ID would they assign to their +/// branches? If they pick the same one, and later try to push the +/// branches to the same remote server, they will get mixed up. +/// +/// To avoid those issues, Zenith has its own concept of timelines that +/// is separate from PostgreSQL timelines, and doesn't have those +/// limitations. A zenith timeline is identified by a 128-bit ID, which +/// is usually printed out as a hex string. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ZTimelineId(ZId); + +zid_newtype!(ZTimelineId); + +// Zenith Tenant Id represents identifiar of a particular tenant. +// Is used for distinguishing requests and data belonging to different users. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +pub struct ZTenantId(ZId); + +zid_newtype!(ZTenantId); + +// for now the following impls are used only with ZTenantId, +// if this impls become useful in other newtypes they can be moved under zid_newtype macro too +impl FromHex for ZTenantId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + Ok(ZTenantId(ZId::from_hex(hex)?)) + } +} + +impl AsRef<[u8]> for ZTenantId { + fn as_ref(&self) -> &[u8] { + &self.0 .0 + } +}