diff --git a/Cargo.lock b/Cargo.lock index e65936c6af..9c6e5ba00c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "hex", "lazy_static", "nix", "pageserver", @@ -674,6 +675,9 @@ name = "hex" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +dependencies = [ + "serde", +] [[package]] name = "hex-literal" diff --git a/README.md b/README.md index 0894dc1f1a..07054b2e7e 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ pip install pytest psycopg2 2. Build zenith and patched postgres ```sh -git clone --recursive https://github.com/libzenith/zenith.git +git clone --recursive https://github.com/zenithdb/zenith.git cd zenith make -j5 ``` @@ -34,8 +34,7 @@ make -j5 # Create repository in .zenith with proper paths to binaries and data # Later that would be responsibility of a package install script > ./target/debug/zenith init -<...> -new zenith repository was created in .zenith +pageserver init succeeded # start pageserver > ./target/debug/zenith start @@ -87,7 +86,7 @@ waiting for server to start.... done # but all modifications would not affect data in original postgres > psql -p55433 -h 127.0.0.1 postgres postgres=# select * from t; - key | value + key | value -----+------- 1 | 1 (1 row) diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 1a73736b4e..520a694877 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1.0" bytes = "1.0.1" nix = "0.20" url = "2.2.2" +hex = { version = "0.4.3", features = ["serde"] } pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a7059e3c3f..5122f205f2 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -17,7 +17,7 @@ use regex::Regex; use zenith_utils::connstring::connection_host_port; use crate::local_env::LocalEnv; -use pageserver::ZTimelineId; +use pageserver::{ZTenantId, ZTimelineId}; use crate::storage::PageServerNode; @@ -27,27 +27,36 @@ use crate::storage::PageServerNode; pub struct ComputeControlPlane { base_port: u16, pageserver: Arc, - pub nodes: BTreeMap>, + pub nodes: BTreeMap<(ZTenantId, String), Arc>, env: LocalEnv, } impl ComputeControlPlane { // Load current nodes with ports from data directories on disk + // Directory structure has the following layout: + // pgdatadirs + // |- tenants + // | |- + // | | |- pub fn load(env: LocalEnv) -> Result { // TODO: since pageserver do not have config file yet we believe here that // it is running on default port. Change that when pageserver will have config. let pageserver = Arc::new(PageServerNode::from_env(&env)); + let mut nodes = BTreeMap::default(); let pgdatadirspath = &env.pg_data_dirs_path(); - let nodes: Result> = fs::read_dir(&pgdatadirspath) + + for tenant_dir in fs::read_dir(&pgdatadirspath) .with_context(|| format!("failed to list {}", pgdatadirspath.display()))? - .into_iter() - .map(|f| { - PostgresNode::from_dir_entry(f?, &env, &pageserver) - .map(|node| (node.name.clone(), Arc::new(node))) - }) - .collect(); - let nodes = nodes?; + { + let tenant_dir = tenant_dir?; + for timeline_dir in fs::read_dir(tenant_dir.path()) + .with_context(|| format!("failed to list {}", tenant_dir.path().display()))? + { + let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?; + nodes.insert((node.tenantid, node.name.clone()), Arc::new(node)); + } + } Ok(ComputeControlPlane { base_port: 55431, @@ -82,6 +91,7 @@ impl ComputeControlPlane { is_test: bool, timelineid: ZTimelineId, name: &str, + tenantid: ZTenantId, ) -> Result> { let node = Arc::new(PostgresNode { name: name.to_owned(), @@ -90,19 +100,26 @@ impl ComputeControlPlane { pageserver: Arc::clone(&self.pageserver), is_test, timelineid, + tenantid, }); node.init_from_page_server()?; - self.nodes.insert(node.name.clone(), Arc::clone(&node)); + self.nodes + .insert((tenantid, node.name.clone()), Arc::clone(&node)); Ok(node) } - pub fn new_node(&mut self, branch_name: &str) -> Result> { - let timeline_id = self.pageserver.branch_get_by_name(branch_name)?.timeline_id; - - let node = self.new_from_page_server(false, timeline_id, branch_name)?; - + pub fn new_node( + &mut self, + tenantid: ZTenantId, + branch_name: &str, + ) -> Result> { + let timeline_id = self + .pageserver + .branch_get_by_name(&tenantid, branch_name)? + .timeline_id; + let node = self.new_from_page_server(false, timeline_id, branch_name, tenantid)?; // Configure the node to stream WAL directly to the pageserver node.append_conf( "postgresql.conf", @@ -112,7 +129,7 @@ impl ComputeControlPlane { "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping ), - node.connstr() + node.connstr(), ) .as_str(), )?; @@ -123,6 +140,7 @@ impl ComputeControlPlane { /////////////////////////////////////////////////////////////////////////////// +#[derive(Debug)] pub struct PostgresNode { pub address: SocketAddr, name: String, @@ -130,6 +148,7 @@ pub struct PostgresNode { pageserver: Arc, is_test: bool, pub timelineid: ZTimelineId, + pub tenantid: ZTenantId, } impl PostgresNode { @@ -149,6 +168,8 @@ impl PostgresNode { static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap(); static ref CONF_TIMELINE_RE: Regex = Regex::new(r"(?m)^\s*zenith.zenith_timeline\s*=\s*'(\w+)'\s*$").unwrap(); + static ref CONF_TENANT_RE: Regex = + Regex::new(r"(?m)^\s*zenith.zenith_tenant\s*=\s*'(\w+)'\s*$").unwrap(); } // parse data directory name @@ -196,6 +217,22 @@ impl PostgresNode { .parse() .with_context(|| err_msg)?; + // parse tenant + let err_msg = format!( + "failed to find tenant definition in config file {}", + cfg_path.to_str().unwrap() + ); + let tenantid = CONF_TENANT_RE + .captures(config.as_str()) + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? + .iter() + .last() + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? + .as_str() + .parse() + .with_context(|| err_msg)?; + // ok now Ok(PostgresNode { address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -204,6 +241,7 @@ impl PostgresNode { pageserver: Arc::clone(pageserver), is_test: false, timelineid, + tenantid, }) } @@ -223,7 +261,7 @@ impl PostgresNode { fs::remove_dir_all(&pgdata).ok(); } - let sql = format!("basebackup {}", self.timelineid); + let sql = format!("basebackup {} {}", self.tenantid, self.timelineid); let mut client = self .pageserver .page_server_psql_client() @@ -293,12 +331,16 @@ impl PostgresNode { let (host, port) = connection_host_port(&self.pageserver.connection_config()); self.append_conf( "postgresql.conf", - &format!( - "shared_preload_libraries = zenith \n\ - zenith.page_server_connstring = 'host={} port={}'\n\ - zenith.zenith_timeline='{}'\n", - host, port, self.timelineid - ), + format!( + concat!( + "shared_preload_libraries = zenith\n", + "zenith.page_server_connstring = 'host={} port={}'\n", + "zenith.zenith_timeline='{}'\n", + "zenith.zenith_tenant='{}'\n", + ), + host, port, self.timelineid, self.tenantid, + ) + .as_str(), )?; fs::create_dir_all(self.pgdata().join("pg_wal"))?; @@ -307,7 +349,7 @@ impl PostgresNode { } pub fn pgdata(&self) -> PathBuf { - self.env.pg_data_dir(&self.name) + self.env.pg_data_dir(&self.tenantid, &self.name) } pub fn status(&self) -> &str { diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index fabc809bfc..7f25f105bd 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -5,6 +5,8 @@ // script which will use local paths. // use anyhow::{anyhow, Result}; +use hex; +use pageserver::ZTenantId; use serde::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; @@ -16,7 +18,7 @@ pub type Remotes = BTreeMap; // // This data structures represent deserialized zenith CLI config // -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct LocalEnv { // Pageserver connection strings pub pageserver_connstring: String, @@ -33,6 +35,10 @@ pub struct LocalEnv { // Path to pageserver binary. Empty for remote pageserver. pub zenith_distrib_dir: Option, + // keeping tenant id in config to reduce copy paste when running zenith locally with single tenant + #[serde(with = "hex")] + pub tenantid: ZTenantId, + pub remotes: Remotes, } @@ -54,11 +60,13 @@ impl LocalEnv { } pub fn pg_data_dirs_path(&self) -> PathBuf { - self.base_data_dir.join("pgdatadirs") + self.base_data_dir.join("pgdatadirs").join("tenants") } - pub fn pg_data_dir(&self, name: &str) -> PathBuf { - self.pg_data_dirs_path().join(name) + pub fn pg_data_dir(&self, tenantid: &ZTenantId, branch_name: &str) -> PathBuf { + self.pg_data_dirs_path() + .join(tenantid.to_string()) + .join(branch_name) } // TODO: move pageserver files into ./pageserver @@ -77,7 +85,7 @@ fn base_path() -> PathBuf { // // Initialize a new Zenith repository // -pub fn init(remote_pageserver: Option<&str>) -> Result<()> { +pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> { // check if config already exists let base_path = base_path(); if base_path.exists() { @@ -102,9 +110,6 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } - fs::create_dir(&base_path)?; - fs::create_dir(base_path.join("pgdatadirs"))?; - let conf = if let Some(addr) = remote_pageserver { // check that addr is parsable let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?; @@ -115,6 +120,7 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { zenith_distrib_dir: None, base_data_dir: base_path, remotes: BTreeMap::default(), + tenantid, } } else { // Find zenith binaries. @@ -129,9 +135,12 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { zenith_distrib_dir: Some(zenith_distrib_dir), base_data_dir: base_path, remotes: BTreeMap::default(), + tenantid, } }; + fs::create_dir_all(conf.pg_data_dirs_path())?; + let toml = toml::to_string_pretty(&conf)?; fs::write(conf.base_data_dir.join("config"), toml)?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3f73ceccc6..d12e8d3366 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -8,6 +8,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; +use pageserver::ZTenantId; use postgres::{Config, NoTls}; use crate::local_env::LocalEnv; @@ -20,6 +21,7 @@ use zenith_utils::connstring::connection_address; // // Used in CLI and tests. // +#[derive(Debug)] pub struct PageServerNode { pub kill_on_exit: bool, pub connection_config: Option, @@ -48,16 +50,18 @@ impl PageServerNode { } } - pub fn init(&self) -> Result<()> { + pub fn init(&self, create_tenant: Option<&str>) -> Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); + let mut args = vec![ + "--init", + "-D", + self.env.base_data_dir.to_str().unwrap(), + "--postgres-distrib", + self.env.pg_distrib_dir.to_str().unwrap(), + ]; + create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid])); let status = cmd - .args(&[ - "--init", - "-D", - self.env.base_data_dir.to_str().unwrap(), - "--postgres-distrib", - self.env.pg_distrib_dir.to_str().unwrap(), - ]) + .args(args) .env_clear() .env("RUST_BACKTRACE", "1") .status() @@ -148,9 +152,30 @@ impl PageServerNode { self.connection_config().connect(NoTls) } - pub fn branches_list(&self) -> Result> { + pub fn tenants_list(&self) -> Result> { let mut client = self.page_server_psql_client()?; - let query_result = client.simple_query("branch_list")?; + let query_result = client.simple_query("tenant_list")?; + let tenants_json = query_result + .first() + .map(|msg| match msg { + postgres::SimpleQueryMessage::Row(row) => row.get(0), + _ => None, + }) + .flatten() + .ok_or_else(|| anyhow!("missing tenants"))?; + + Ok(serde_json::from_str(tenants_json)?) + } + + pub fn tenant_create(&self, tenantid: &ZTenantId) -> Result<()> { + let mut client = self.page_server_psql_client()?; + client.simple_query(format!("tenant_create {}", tenantid).as_str())?; + Ok(()) + } + + pub fn branches_list(&self, tenantid: &ZTenantId) -> Result> { + let mut client = self.page_server_psql_client()?; + let query_result = client.simple_query(&format!("branch_list {}", tenantid))?; let branches_json = query_result .first() .map(|msg| match msg { @@ -160,14 +185,19 @@ impl PageServerNode { .flatten() .ok_or_else(|| anyhow!("missing branches"))?; - let res: Vec = serde_json::from_str(branches_json)?; - Ok(res) + Ok(serde_json::from_str(branches_json)?) } - pub fn branch_create(&self, name: &str, startpoint: &str) -> Result { + pub fn branch_create( + &self, + branch_name: &str, + startpoint: &str, + tenantid: &ZTenantId, + ) -> Result { let mut client = self.page_server_psql_client()?; - let query_result = - client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?; + let query_result = client.simple_query( + format!("branch_create {} {} {}", tenantid, branch_name, startpoint).as_str(), + )?; let branch_json = query_result .first() @@ -190,8 +220,12 @@ impl PageServerNode { } // TODO: make this a separate request type and avoid loading all the branches - pub fn branch_get_by_name(&self, name: &str) -> Result { - let branch_infos = self.branches_list()?; + pub fn branch_get_by_name( + &self, + tenantid: &ZTenantId, + branch_name: &str, + ) -> Result { + let branch_infos = self.branches_list(tenantid)?; let branche_by_name: Result> = branch_infos .into_iter() .map(|branch_info| Ok((branch_info.name.clone(), branch_info))) @@ -199,8 +233,8 @@ impl PageServerNode { let branche_by_name = branche_by_name?; let branch = branche_by_name - .get(name) - .ok_or_else(|| anyhow!("Branch {} not found", name))?; + .get(branch_name) + .ok_or_else(|| anyhow!("Branch {} not found", branch_name))?; Ok(branch.clone()) } diff --git a/docs/multitenancy.md b/docs/multitenancy.md new file mode 100644 index 0000000000..7016509375 --- /dev/null +++ b/docs/multitenancy.md @@ -0,0 +1,59 @@ +## Multitenancy + +### Overview + +Zenith supports multitenancy. One pageserver can serve multiple tenants at once. Tenants can be managed via zenith CLI. During page server setup tenant can be created using ```zenith init --create-tenant``` Also tenants can be added into the system on the fly without pageserver restart. This can be done using the following cli command: ```zenith tenant create``` Tenants use random identifiers which can be represented as a 32 symbols hexadecimal string. So zenith tenant create accepts desired tenant id as an optional argument. The concept of timelines/branches is working independently per tenant. + +### Tenants in other commands + +By default during `zenith init` new tenant is created on the pageserver. Newly created tenant's id is saved to cli config, so other commands can use it automatically if no direct arugment `--tenantid=` is provided. So generally tenantid more frequently appears in internal pageserver interface. Its commands take tenantid argument to distinguish to which tenant operation should be applied. CLI support creation of new tenants. + +Examples for cli: + +```sh +zenith tenant list + +zenith tenant create // generates new id + +zenith tenant create ee6016ec31116c1b7c33dfdfca38892f + +zenith pg create main // default tenant from zenith init + +zenith pg create main --tenantid=ee6016ec31116c1b7c33dfdfca38892f + +zenith branch --tenantid=ee6016ec31116c1b7c33dfdfca38892f +``` + +### Data layout + +On the page server tenants introduce one level of indirection, so data directory structured the following way: +``` + +├── pageserver.log +├── pageserver.pid +├── pageserver.toml +└── tenants + ├── 537cffa58a4fa557e49e19951b5a9d6b + ├── de182bc61fb11a5a6b390a8aed3a804a + └── ee6016ec31116c1b7c33dfdfca38891f +``` +Wal redo activity, timelines, snapshots are managed for each tenant independently. + +For local environment used for example in tests there also new level of indirection for tenants. It touches `pgdatadirs` directory. Now it contains `tenants` subdirectory so the structure looks the following way: + +``` +pgdatadirs +└── tenants + ├── de182bc61fb11a5a6b390a8aed3a804a + │ └── main + └── ee6016ec31116c1b7c33dfdfca38892f + └── main +``` + +### Changes to postgres + +Tenant id is passed to postgres via GUC the same way as the timeline. Tenant id is added to commands issued to pageserver, namely: pagestream, callmemaybe. Tenant id is also exists in ServerInfo structure, this is needed to pass the value to wal receiver to be able to forward it to the pageserver. + +### Safety + +For now particular tenant can only appear on a particular pageserver. Set of WAL acceptors are also pinned to particular (tenantid, timeline) pair so there can only be one writer for particular (tenantid, timeline). diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 8b5e759eed..1b6b939db4 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -8,10 +8,11 @@ //! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and //! data stored in object storage. //! -use crate::ZTimelineId; +use crate::{PageServerConf, ZTenantId, ZTimelineId}; use bytes::{BufMut, BytesMut}; use log::*; use std::io::Write; +use std::path::PathBuf; use std::sync::Arc; use std::time::SystemTime; use tar::{Builder, Header}; @@ -32,7 +33,7 @@ pub struct Basebackup<'a> { timeline: &'a Arc, lsn: Lsn, prev_record_lsn: Lsn, - snappath: String, + snappath: PathBuf, slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], slru_segno: u32, slru_path: &'static str, @@ -40,7 +41,9 @@ pub struct Basebackup<'a> { impl<'a> Basebackup<'a> { pub fn new( + conf: &PageServerConf, write: &'a mut dyn Write, + tenantid: ZTenantId, timelineid: ZTimelineId, timeline: &'a Arc, lsn: Lsn, @@ -52,7 +55,9 @@ impl<'a> Basebackup<'a> { timeline, lsn, prev_record_lsn, - snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0), + snappath: conf + .snapshots_path(&timelineid, &tenantid) + .join(format!("{:016X}", snapshot_lsn.0)), slru_path: "", slru_segno: u32::MAX, slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], @@ -60,7 +65,7 @@ impl<'a> Basebackup<'a> { } pub fn send_tarball(&mut self) -> anyhow::Result<()> { - debug!("sending tarball of snapshot in {}", self.snappath); + debug!("sending tarball of snapshot in {}", self.snappath.display()); for entry in WalkDir::new(&self.snappath) { let entry = entry?; let fullpath = entry.path(); @@ -179,7 +184,7 @@ impl<'a> Basebackup<'a> { } else { // User defined tablespaces are not supported assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); - let src_path = format!("{}/base/1/PG_VERSION", self.snappath); + let src_path = self.snappath.join("base/1/PG_VERSION"); let dst_path = format!("base/{}/PG_VERSION", db.dbnode); self.ar.append_path_with_name(&src_path, &dst_path)?; format!("base/{}/pg_filenode.map", db.dbnode) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4606f08bc6..31f96e366e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -6,18 +6,18 @@ use log::*; use serde::{Deserialize, Serialize}; use std::{ env, - net::{TcpListener}, + net::TcpListener, path::{Path, PathBuf}, process::exit, thread, time::Duration, }; -use anyhow::{Result}; +use anyhow::Result; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; -use pageserver::{branches, page_cache, page_service, tui, logger, PageServerConf}; +use pageserver::{branches, logger, page_cache, page_service, tui, PageServerConf}; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; @@ -158,6 +158,13 @@ fn main() -> Result<()> { .takes_value(true) .help("Postgres distribution directory"), ) + .arg( + Arg::with_name("create-tenant") + .long("create-tenant") + .takes_value(true) + .help("Create tenant during init") + .requires("init"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); @@ -166,6 +173,8 @@ fn main() -> Result<()> { let args_params = CfgFileParams::from_args(&arg_matches); let init = arg_matches.is_present("init"); + let create_tenant = arg_matches.value_of("create-tenant"); + let params = if init { // We're initializing the repo, so there's no config file yet args_params @@ -199,8 +208,7 @@ fn main() -> Result<()> { // Create repo and exit if init was requested if init { - branches::init_repo(conf, &workdir)?; - + branches::init_pageserver(conf, workdir, create_tenant)?; // write the config file let cfg_file_contents = toml::to_string_pretty(¶ms)?; std::fs::write(&cfg_file_path, cfg_file_contents)?; @@ -215,7 +223,6 @@ fn main() -> Result<()> { } fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { - // Initialize logger let (_scope_guard, log_file) = logger::init_logging(&conf, "pageserver.log")?; let _log_guard = slog_stdlog::init()?; diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index c0e26746b0..0a2218ae35 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -4,15 +4,15 @@ // TODO: move all paths construction to conf impl // -use anyhow::{bail, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use fs::File; use postgres_ffi::{pg_constants, xlog_utils, ControlFileData}; -use rand::Rng; use serde::{Deserialize, Serialize}; use std::env; -use std::io::{Read, Write}; +use std::io::Read; +use std::sync::Arc; use std::{ - fs, io, + fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr, @@ -22,8 +22,11 @@ use zenith_utils::lsn::Lsn; use log::*; use crate::logger; +use crate::object_repository::ObjectRepository; use crate::page_cache; use crate::restore_local_repo; +use crate::walredo::WalRedoManager; +use crate::ZTenantId; use crate::{repository::Repository, PageServerConf, ZTimelineId}; #[derive(Serialize, Deserialize, Clone)] @@ -41,35 +44,61 @@ pub struct PointInTime { pub lsn: Lsn, } -pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { - // top-level dir may exist if we are creating it through CLI - fs::create_dir_all(repo_dir) - .with_context(|| format!("could not create directory {}", repo_dir.display()))?; - - env::set_current_dir(repo_dir)?; - +pub fn init_pageserver( + conf: &'static PageServerConf, + workdir: &Path, + create_tenant: Option<&str>, +) -> Result<()> { // Initialize logger let (_scope_guard, _log_file) = logger::init_logging(&conf, "pageserver.log")?; let _log_guard = slog_stdlog::init()?; + env::set_current_dir(workdir)?; + if let Some(tenantid) = create_tenant { + let tenantid = ZTenantId::from_str(tenantid)?; + println!("initializing tenantid {}", tenantid); + create_repo( + conf, + tenantid, + Arc::new(crate::walredo::DummyRedoManager {}), + ) + .with_context(|| "failed to create repo")?; + } + fs::create_dir_all(conf.tenants_path())?; + println!("pageserver init succeeded"); + Ok(()) +} + +pub fn create_repo( + conf: &'static PageServerConf, + tenantid: ZTenantId, + wal_redo_manager: Arc, +) -> Result { + let repo_dir = conf.tenant_path(&tenantid); + if repo_dir.exists() { + bail!("repo for {} already exists", tenantid) + } + + // top-level dir may exist if we are creating it through CLI + fs::create_dir_all(&repo_dir) + .with_context(|| format!("could not create directory {}", repo_dir.display()))?; + // Note: this `info!(...)` macro comes from `log` crate info!("standard logging redirected to slog"); - fs::create_dir(std::path::Path::new("timelines"))?; - fs::create_dir(std::path::Path::new("refs"))?; - fs::create_dir(std::path::Path::new("refs").join("branches"))?; - fs::create_dir(std::path::Path::new("refs").join("tags"))?; + fs::create_dir(conf.timelines_path(&tenantid))?; + fs::create_dir_all(conf.branches_path(&tenantid))?; + fs::create_dir_all(conf.tags_path(&tenantid))?; - println!("created directory structure in {}", repo_dir.display()); + info!("created directory structure in {}", repo_dir.display()); // Run initdb // // We create the cluster temporarily in a "tmp" directory inside the repository, // and move it to the right location from there. - let tmppath = std::path::Path::new("tmp"); + let tmppath = conf.tenant_path(&tenantid).join("tmp"); - print!("running initdb... "); - io::stdout().flush()?; + info!("running initdb... "); let initdb_path = conf.pg_bin_dir().join("initdb"); let initdb_otput = Command::new(initdb_path) @@ -88,7 +117,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { String::from_utf8_lossy(&initdb_otput.stderr) ); } - println!("initdb succeeded"); + info!("initdb succeeded"); // Read control file to extract the LSN and system id let controlfile_path = tmppath.join("global").join("pg_control"); @@ -98,8 +127,8 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { let lsnstr = format!("{:016X}", lsn); // Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch. - let tli = create_timeline(conf, None)?; - let timelinedir = conf.timeline_path(tli); + let tli = create_timeline(conf, None, &tenantid)?; + let timelinedir = conf.timeline_path(&tli, &tenantid); // We don't use page_cache here, because we don't want to spawn the WAL redo thread during // repository initialization. @@ -110,12 +139,13 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // and we failed to run initdb again in the same directory. This has been solved for the // rapid init+start case now, but the general race condition remains if you restart the // server quickly. - let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?; + let storage = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?; let repo = crate::object_repository::ObjectRepository::new( conf, std::sync::Arc::new(storage), - std::sync::Arc::new(crate::walredo::DummyRedoManager {}), + wal_redo_manager, + tenantid, ); let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?; @@ -128,11 +158,11 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { .join("wal") .join("000000010000000000000001.partial"), )?; - println!("created initial timeline {}", tli); + info!("created initial timeline {}", tli); let data = tli.to_string(); - fs::write(conf.branch_path("main"), data)?; - println!("created main branch"); + fs::write(conf.branch_path("main", &tenantid), data)?; + info!("created main branch"); // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; @@ -143,20 +173,32 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; - println!( + info!( "new zenith repository was created in {}", repo_dir.display() ); - Ok(()) + Ok(repo) } -pub(crate) fn get_branches(conf: &PageServerConf) -> Result> { - let repo = page_cache::get_repository(); +pub(crate) fn get_tenants(conf: &PageServerConf) -> Result> { + let tenants_dir = conf.tenants_path(); + + std::fs::read_dir(&tenants_dir)? + .map(|dir_entry_res| { + let dir_entry = dir_entry_res?; + ensure!(dir_entry.file_type()?.is_dir()); + Ok(dir_entry.file_name().to_str().unwrap().to_owned()) + }) + .collect() +} + +pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Result> { + let repo = page_cache::get_repository_for_tenant(tenantid)?; // Each branch has a corresponding record (text file) in the refs/branches // with timeline_id. - let branches_dir = std::path::Path::new("refs").join("branches"); + let branches_dir = conf.branches_path(tenantid); std::fs::read_dir(&branches_dir)? .map(|dir_entry_res| { @@ -169,7 +211,7 @@ pub(crate) fn get_branches(conf: &PageServerConf) -> Result> { .map(|timeline| timeline.get_last_valid_lsn()) .ok(); - let ancestor_path = conf.ancestor_path(timeline_id); + let ancestor_path = conf.ancestor_path(&timeline_id, tenantid); let mut ancestor_id: Option = None; let mut ancestor_lsn: Option = None; @@ -206,14 +248,15 @@ pub(crate) fn create_branch( conf: &PageServerConf, branchname: &str, startpoint_str: &str, + tenantid: &ZTenantId, ) -> Result { - let repo = page_cache::get_repository(); + let repo = page_cache::get_repository_for_tenant(tenantid)?; - if conf.branch_path(&branchname).exists() { + if conf.branch_path(branchname, tenantid).exists() { anyhow::bail!("branch {} already exists", branchname); } - let mut startpoint = parse_point_in_time(conf, startpoint_str)?; + let mut startpoint = parse_point_in_time(conf, startpoint_str, tenantid)?; if startpoint.lsn == Lsn(0) { // Find end of WAL on the old timeline @@ -225,19 +268,20 @@ pub(crate) fn create_branch( } // create a new timeline directory for it - let newtli = create_timeline(conf, Some(startpoint))?; - let newtimelinedir = conf.timeline_path(newtli); + let newtli = create_timeline(conf, Some(startpoint), tenantid)?; + let newtimelinedir = conf.timeline_path(&newtli, tenantid); // Let the Repository backend do its initialization repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?; // Copy the latest snapshot (TODO: before the startpoint) and all WAL // TODO: be smarter and avoid the copying... - let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?; + let (_maxsnapshot, oldsnapshotdir) = + find_latest_snapshot(conf, &startpoint.timelineid, tenantid)?; let copy_opts = fs_extra::dir::CopyOptions::new(); fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - let oldtimelinedir = conf.timeline_path(startpoint.timelineid); + let oldtimelinedir = conf.timeline_path(&startpoint.timelineid, tenantid); copy_wal( &oldtimelinedir.join("wal"), &newtimelinedir.join("wal"), @@ -249,7 +293,7 @@ pub(crate) fn create_branch( // FIXME: there's a race condition, if you create a branch with the same // name concurrently. let data = newtli.to_string(); - fs::write(conf.branch_path(&branchname), data)?; + fs::write(conf.branch_path(&branchname, tenantid), data)?; Ok(BranchInfo { name: branchname.to_string(), @@ -279,7 +323,11 @@ pub(crate) fn create_branch( // mytag // // -fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { +fn parse_point_in_time( + conf: &PageServerConf, + s: &str, + tenantid: &ZTenantId, +) -> Result { let mut strings = s.split('@'); let name = strings.next().unwrap(); @@ -294,21 +342,21 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { // Check if it's a tag if lsn.is_none() { - let tagpath = conf.tag_path(name); + let tagpath = conf.tag_path(name, &tenantid); if tagpath.exists() { let pointstr = fs::read_to_string(tagpath)?; - return parse_point_in_time(conf, &pointstr); + return parse_point_in_time(conf, &pointstr, &tenantid); } } // Check if it's a branch // Check if it's branch @ LSN - let branchpath = conf.branch_path(name); + let branchpath = conf.branch_path(name, &tenantid); if branchpath.exists() { let pointstr = fs::read_to_string(branchpath)?; - let mut result = parse_point_in_time(conf, &pointstr)?; + let mut result = parse_point_in_time(conf, &pointstr, &tenantid)?; result.lsn = lsn.unwrap_or(Lsn(0)); return Ok(result); @@ -317,7 +365,7 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { // Check if it's a timelineid // Check if it's timelineid @ LSN if let Ok(timelineid) = ZTimelineId::from_str(name) { - let tlipath = conf.timeline_path(timelineid); + let tlipath = conf.timeline_path(&timelineid, &tenantid); if tlipath.exists() { return Ok(PointInTime { timelineid, @@ -329,13 +377,16 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } -fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { +fn create_timeline( + conf: &PageServerConf, + ancestor: Option, + tenantid: &ZTenantId, +) -> Result { // Create initial timeline - let mut tli_buf = [0u8; 16]; - rand::thread_rng().fill(&mut tli_buf); - let timelineid = ZTimelineId::from(tli_buf); - let timelinedir = conf.timeline_path(timelineid); + let timelineid = ZTimelineId::generate(); + + let timelinedir = conf.timeline_path(&timelineid, tenantid); fs::create_dir(&timelinedir)?; fs::create_dir(&timelinedir.join("snapshots"))?; @@ -397,8 +448,12 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: usize) -> R } // Find the latest snapshot for a timeline -fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> { - let snapshotsdir = conf.snapshots_path(timeline); +fn find_latest_snapshot( + conf: &PageServerConf, + timelineid: &ZTimelineId, + tenantid: &ZTenantId, +) -> Result<(Lsn, PathBuf)> { + let snapshotsdir = conf.snapshots_path(timelineid, tenantid); let paths = fs::read_dir(&snapshotsdir)?; let mut maxsnapshot = Lsn(0); let mut snapshotdir: Option = None; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8a3b89e02f..f9c3703d77 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,3 +1,5 @@ +use hex::FromHex; +use rand::Rng; use serde::{Deserialize, Serialize}; use std::fmt; @@ -48,24 +50,48 @@ impl PageServerConf { // Repository paths, relative to workdir. // - fn tag_path(&self, name: &str) -> PathBuf { - self.workdir.join("refs").join("tags").join(name) + fn tenants_path(&self) -> PathBuf { + self.workdir.join("tenants") } - fn branch_path(&self, name: &str) -> PathBuf { - self.workdir.join("refs").join("branches").join(name) + fn tenant_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenants_path().join(tenantid.to_string()) } - fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.workdir.join("timelines").join(timelineid.to_string()) + fn tags_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("refs").join("tags") } - fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.timeline_path(timelineid).join("snapshots") + fn tag_path(&self, tag_name: &str, tenantid: &ZTenantId) -> PathBuf { + self.tags_path(tenantid).join(tag_name) } - fn ancestor_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.timeline_path(timelineid).join("ancestor") + fn branches_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("refs").join("branches") + } + + fn branch_path(&self, branch_name: &str, tenantid: &ZTenantId) -> PathBuf { + self.branches_path(tenantid).join(branch_name) + } + + fn timelines_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("timelines") + } + + fn timeline_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timelines_path(tenantid).join(timelineid.to_string()) + } + + fn snapshots_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("snapshots") + } + + fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("ancestor") + } + + fn wal_dir_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("wal") } // @@ -81,8 +107,67 @@ impl PageServerConf { } } -/// Zenith Timeline ID is a 128-bit random ID. -/// +// Zenith ID is a 128-bit random ID. +// Used to represent various identifiers. Provides handy utility methods and impls. +// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +struct ZId([u8; 16]); + +impl ZId { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId { + let mut arr = [0u8; 16]; + buf.copy_to_slice(&mut arr); + ZId::from(arr) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0 + } + + pub fn generate() -> Self { + let mut tli_buf = [0u8; 16]; + rand::thread_rng().fill(&mut tli_buf); + ZId::from(tli_buf) + } +} + +impl FromStr for ZId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + Self::from_hex(s) + } +} + +// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate +impl FromHex for ZId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + let mut buf: [u8; 16] = [0u8; 16]; + hex::decode_to_slice(hex, &mut buf)?; + Ok(ZId(buf)) + } +} + +impl AsRef<[u8]> for ZId { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl From<[u8; 16]> for ZId { + fn from(b: [u8; 16]) -> Self { + ZId(b) + } +} + +impl fmt::Display for ZId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + /// Zenith timeline IDs are different from PostgreSQL timeline /// IDs. They serve a similar purpose though: they differentiate /// between different "histories" of the same cluster. However, @@ -106,38 +191,93 @@ impl PageServerConf { /// limitations. A zenith timeline is identified by a 128-bit ID, which /// is usually printed out as a hex string. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ZTimelineId([u8; 16]); +pub struct ZTimelineId(ZId); impl FromStr for ZTimelineId { type Err = hex::FromHexError; fn from_str(s: &str) -> Result { - let timelineid = hex::decode(s)?; + let value = ZId::from_str(s)?; + Ok(ZTimelineId(value)) + } +} - let mut buf: [u8; 16] = [0u8; 16]; - buf.copy_from_slice(timelineid.as_slice()); - Ok(ZTimelineId(buf)) +impl From<[u8; 16]> for ZTimelineId { + fn from(b: [u8; 16]) -> Self { + ZTimelineId(ZId::from(b)) } } impl ZTimelineId { - pub fn from(b: [u8; 16]) -> ZTimelineId { - ZTimelineId(b) - } - pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId { - let mut arr = [0u8; 16]; - buf.copy_to_slice(&mut arr); - ZTimelineId::from(arr) + ZTimelineId(ZId::get_from_buf(buf)) } pub fn as_arr(&self) -> [u8; 16] { - self.0 + self.0.as_arr() + } + + pub fn generate() -> Self { + ZTimelineId(ZId::generate()) } } impl fmt::Display for ZTimelineId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) + self.0.fmt(f) + } +} + +// Zenith Tenant Id represents identifiar of a particular tenant. +// Is used for distinguishing requests and data belonging to different users. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +pub struct ZTenantId(ZId); + +impl FromStr for ZTenantId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + let value = ZId::from_str(s)?; + Ok(ZTenantId(value)) + } +} + +impl From<[u8; 16]> for ZTenantId { + fn from(b: [u8; 16]) -> Self { + ZTenantId(ZId::from(b)) + } +} + +impl FromHex for ZTenantId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + Ok(ZTenantId(ZId::from_hex(hex)?)) + } +} + +impl AsRef<[u8]> for ZTenantId { + fn as_ref(&self) -> &[u8] { + &self.0 .0 + } +} + +impl ZTenantId { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTenantId { + ZTenantId(ZId::get_from_buf(buf)) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0.as_arr() + } + + pub fn generate() -> Self { + ZTenantId(ZId::generate()) + } +} + +impl fmt::Display for ZTenantId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) } } diff --git a/pageserver/src/logger.rs b/pageserver/src/logger.rs index fe3ed381f2..b1a98bbb7e 100644 --- a/pageserver/src/logger.rs +++ b/pageserver/src/logger.rs @@ -1,14 +1,13 @@ use crate::{tui, PageServerConf}; -use std::fs::{OpenOptions, File}; use anyhow::{Context, Result}; use slog::{Drain, FnValue}; +use std::fs::{File, OpenOptions}; pub fn init_logging( conf: &PageServerConf, - log_filename: &str + log_filename: &str, ) -> Result<(slog_scope::GlobalLoggerGuard, File)> { - // Don't open the same file for output multiple times; // the different fds could overwrite each other's output. let log_file = OpenOptions::new() diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 366055ce6d..fbc607f24b 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -13,11 +13,11 @@ //! until we find the page we're looking for, making a separate lookup into the //! key-value store for each timeline. -use crate::object_key::*; use crate::object_store::ObjectStore; use crate::repository::*; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; +use crate::{object_key::*, ZTenantId}; use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; @@ -42,6 +42,7 @@ pub struct ObjectRepository { conf: &'static PageServerConf, timelines: Mutex>>, walredo_mgr: Arc, + tenantid: ZTenantId, } // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -52,12 +53,14 @@ impl ObjectRepository { conf: &'static PageServerConf, obj_store: Arc, walredo_mgr: Arc, + tenantid: ZTenantId, ) -> ObjectRepository { ObjectRepository { conf, obj_store, timelines: Mutex::new(HashMap::new()), walredo_mgr, + tenantid, } } } @@ -82,7 +85,7 @@ impl Repository for ObjectRepository { timelineid, timeline.get_last_record_lsn() ); - let wal_dir = self.conf.timeline_path(timelineid).join("wal"); + let wal_dir = self.conf.wal_dir_path(&timelineid, &self.tenantid); import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; let timeline_rc = Arc::new(timeline); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index add3e3ba7f..4e49cbef3e 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -1,35 +1,51 @@ //! This module acts as a switchboard to access different repositories managed by this -//! page server. Currently, a Page Server can only manage one repository, so there -//! isn't much here. If we implement multi-tenancy, this will probably be changed into -//! a hash map, keyed by the tenant ID. +//! page server. use crate::object_repository::ObjectRepository; use crate::repository::Repository; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; -use crate::PageServerConf; +use crate::{PageServerConf, ZTenantId}; +use anyhow::{anyhow, Result}; use lazy_static::lazy_static; +use log::info; +use std::collections::HashMap; +use std::fs; +use std::str::FromStr; use std::sync::{Arc, Mutex}; lazy_static! { - pub static ref REPOSITORY: Mutex>> = Mutex::new(None); + pub static ref REPOSITORY: Mutex>> = + Mutex::new(HashMap::new()); } pub fn init(conf: &'static PageServerConf) { let mut m = REPOSITORY.lock().unwrap(); - let obj_store = RocksObjectStore::open(conf).unwrap(); + for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() { + let tenantid = + ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap(); + let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap(); - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf); + // Set up a WAL redo manager, for applying WAL records. + let walredo_mgr = PostgresRedoManager::new(conf, tenantid); - // we have already changed current dir to the repository. - let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); - - *m = Some(Arc::new(repo)); + // Set up an object repository, for actual data storage. + let repo = + ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); + info!("initialized storage for tenant: {}", &tenantid); + m.insert(tenantid, Arc::new(repo)); + } } -pub fn get_repository() -> Arc { +pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc) { + let o = &mut REPOSITORY.lock().unwrap(); + o.insert(tenantid, repo); +} + +pub fn get_repository_for_tenant(tenantid: &ZTenantId) -> Result> { let o = &REPOSITORY.lock().unwrap(); - Arc::clone(o.as_ref().unwrap()) + o.get(tenantid) + .map(|repo| Arc::clone(repo)) + .ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid)) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8ff742fbfc..01c9c8e4b1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,13 +10,14 @@ // *callmemaybe $url* -- ask pageserver to start walreceiver on $url // -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, ensure}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use regex::Regex; use std::io::Write; use std::net::TcpListener; use std::str::FromStr; +use std::sync::Arc; use std::thread; use std::{io, net::TcpStream}; use zenith_utils::postgres_backend::PostgresBackend; @@ -33,7 +34,9 @@ use crate::page_cache; use crate::repository::{BufferTag, Modification, RelTag}; use crate::restore_local_repo; use crate::walreceiver; +use crate::walredo::PostgresRedoManager; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; // Wrapped in libpq CopyData @@ -180,9 +183,10 @@ impl PageServerHandler { &self, pgb: &mut PostgresBackend, timelineid: ZTimelineId, + tenantid: ZTenantId, ) -> anyhow::Result<()> { // Check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; let timeline = repository.get_timeline(timelineid).map_err(|_| { anyhow!( "client requested pagestream on timeline {} which does not exist in page server", @@ -274,9 +278,10 @@ impl PageServerHandler { pgb: &mut PostgresBackend, timelineid: ZTimelineId, lsn: Option, + tenantid: ZTenantId, ) -> anyhow::Result<()> { // check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; let timeline = repository.get_timeline(timelineid).map_err(|e| { error!("error fetching timeline: {:?}", e); anyhow!( @@ -292,14 +297,16 @@ impl PageServerHandler { // find latest snapshot let snapshot_lsn = - restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); + restore_local_repo::find_latest_snapshot(&self.conf, &timelineid, &tenantid).unwrap(); let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); { let mut writer = CopyDataSink { pgb }; let mut basebackup = basebackup::Basebackup::new( + self.conf, &mut writer, + tenantid, timelineid, &timeline, req_lsn, @@ -328,84 +335,102 @@ impl postgres_backend::Handler for PageServerHandler { if query_string.last() == Some(&0) { query_string.truncate(query_string.len() - 1); } + let query_string = std::str::from_utf8(&query_string)?; - if query_string.starts_with(b"controlfile") { + if query_string.starts_with("controlfile") { self.handle_controlfile(pgb)?; - } else if query_string.starts_with(b"pagestream ") { - let (_l, r) = query_string.split_at("pagestream ".len()); - let timelineid_str = String::from_utf8(r.to_vec())?; - let timelineid = ZTimelineId::from_str(&timelineid_str)?; + } else if query_string.starts_with("pagestream ") { + let (_, params_raw) = query_string.split_at("pagestream ".len()); + let params = params_raw.split(" ").collect::>(); + ensure!( + params.len() == 2, + "invalid param number for pagestream command" + ); + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; - self.handle_pagerequests(pgb, timelineid)?; - } else if query_string.starts_with(b"basebackup ") { - let (_l, r) = query_string.split_at("basebackup ".len()); - let r = r.to_vec(); - let basebackup_args = String::from(String::from_utf8(r)?.trim_end()); - let args: Vec<&str> = basebackup_args.rsplit(' ').collect(); - let timelineid_str = args[0]; - info!("got basebackup command: \"{}\"", timelineid_str); - let timelineid = ZTimelineId::from_str(&timelineid_str)?; - let lsn = if args.len() > 1 { - Some(Lsn::from_str(args[1])?) + self.handle_pagerequests(pgb, timelineid, tenantid)?; + } else if query_string.starts_with("basebackup ") { + let (_, params_raw) = query_string.split_at("basebackup ".len()); + let params = params_raw.split(" ").collect::>(); + ensure!( + params.len() == 2, + "invalid param number for basebackup command" + ); + + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; + + // TODO are there any tests with lsn option? + let lsn = if params.len() == 3 { + Some(Lsn::from_str(params[2])?) } else { None }; + info!( + "got basebackup command. tenantid=\"{}\" timelineid=\"{}\" lsn=\"{:#?}\"", + tenantid, timelineid, lsn + ); + // Check that the timeline exists - self.handle_basebackup_request(pgb, timelineid, lsn)?; + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"callmemaybe ") { - let query_str = String::from_utf8(query_string.to_vec())?; - - // callmemaybe + } else if query_string.starts_with("callmemaybe ") { + // callmemaybe // TODO lazy static - let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) (.*)$").unwrap(); + let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); let caps = re - .captures(&query_str) - .ok_or_else(|| anyhow!("invalid callmemaybe: '{}'", query_str))?; + .captures(query_string) + .ok_or_else(|| anyhow!("invalid callmemaybe: '{}'", query_string))?; - let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str())?; - let connstr: String = String::from(caps.get(2).unwrap().as_str()); + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let connstr = caps.get(3).unwrap().as_str().to_owned(); // Check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; if repository.get_timeline(timelineid).is_err() { bail!("client requested callmemaybe on timeline {} which does not exist in page server", timelineid); } - walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr); + walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr, tenantid.to_owned()); pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"branch_create ") { - let query_str = String::from_utf8(query_string.to_vec())?; - let err = || anyhow!("invalid branch_create: '{}'", query_str); + } else if query_string.starts_with("branch_create ") { + let err = || anyhow!("invalid branch_create: '{}'", query_string); - // branch_create + // branch_create // TODO lazy static // TOOD: escaping, to allow branch names with spaces - let re = Regex::new(r"^branch_create (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$").unwrap(); - let caps = re.captures(&query_str).ok_or_else(err)?; + let re = Regex::new(r"^branch_create ([[:xdigit:]]+) (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$") + .unwrap(); + let caps = re.captures(&query_string).ok_or_else(err)?; - let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str()); - let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str()); + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned(); + let startpoint_str = caps.get(3).ok_or_else(err)?.as_str().to_owned(); - let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?; + let branch = + branches::create_branch(&self.conf, &branchname, &startpoint_str, &tenantid)?; let branch = serde_json::to_vec(&branch)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::DataRow(&[Some(&branch)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"push ") { - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; + } else if query_string.starts_with("push ") { + // push + let re = Regex::new(r"^push ([[:xdigit:]]+) ([[:xdigit:]]+)$").unwrap(); + + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid push: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let start_lsn = Lsn(0); // TODO this needs to come from the repo - let timeline = - page_cache::get_repository().create_empty_timeline(timeline_id, start_lsn)?; + let timeline = page_cache::get_repository_for_tenant(&tenantid)? + .create_empty_timeline(timelineid, start_lsn)?; pgb.write_message(&BeMessage::CopyInResponse)?; @@ -433,22 +458,23 @@ impl postgres_backend::Handler for PageServerHandler { } pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"request_push ") { - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); + } else if query_string.starts_with("request_push ") { + // request_push + let re = Regex::new(r"^request_push ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; - let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid request_push: '{}'", query_string))?; - let postgres_connection_uri = - it.next().ok_or_else(|| anyhow!("missing postgres uri"))?; + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let postgres_connection_uri = caps.get(3).unwrap().as_str(); + + let timeline = + page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; let mut conn = postgres::Client::connect(postgres_connection_uri, postgres::NoTls)?; - let mut copy_in = conn.copy_in(format!("push {}", timeline_id.to_string()).as_str())?; + let mut copy_in = conn.copy_in(format!("push {}", timelineid.to_string()).as_str())?; let history = timeline.history()?; for update_res in history { @@ -461,44 +487,76 @@ impl postgres_backend::Handler for PageServerHandler { copy_in.finish()?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"branch_list") { - let branches = crate::branches::get_branches(&self.conf)?; + } else if query_string.starts_with("branch_list ") { + // branch_list + let re = Regex::new(r"^branch_list ([[:xdigit:]]+)$").unwrap(); + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid branch_list: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + + let branches = crate::branches::get_branches(&self.conf, &tenantid)?; let branches_buf = serde_json::to_vec(&branches)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::DataRow(&[Some(&branches_buf)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"status") { + } else if query_string.starts_with("tenant_list") { + let tenants = crate::branches::get_tenants(&self.conf)?; + let tenants_buf = serde_json::to_vec(&tenants)?; + + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? + .write_message_noflush(&BeMessage::DataRow(&[Some(&tenants_buf)]))? + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("tenant_create") { + let err = || anyhow!("invalid tenant_create: '{}'", query_string); + + // tenant_create + let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap(); + let caps = re.captures(&query_string).ok_or_else(err)?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let wal_redo_manager = Arc::new(PostgresRedoManager::new(self.conf, tenantid)); + let repo = branches::create_repo(self.conf, tenantid, wal_redo_manager)?; + page_cache::insert_repository_for_tenant(tenantid, Arc::new(repo)); + + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("status") { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&HELLO_WORLD_ROW)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.to_ascii_lowercase().starts_with(b"set ") { + } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"do_gc ") { + } else if query_string.starts_with("do_gc ") { // Run GC immediately on given timeline. // FIXME: This is just for tests. See test_runner/batch_others/test_gc.py. // This probably should require special authentication or a global flag to // enable, I don't think we want to or need to allow regular clients to invoke // GC. - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); + // do_gc + let re = Regex::new(r"^do_gc ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)([[:digit:]]+)?") + .unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; - let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid do_gc: '{}'", query_string))?; - let horizon: u64 = it - .next() - .unwrap_or(&self.conf.gc_horizon.to_string()) - .parse()?; + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let gc_horizon: u64 = caps + .get(4) + .map(|h| h.as_str().parse()) + .unwrap_or(Ok(self.conf.gc_horizon))?; - let result = timeline.gc_iteration(horizon, true)?; + let timeline = + page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; + + let result = timeline.gc_iteration(gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c4b507c0f2..ffa7cf4fe6 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -31,6 +31,7 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + // TODO get timelines? //fn get_stats(&self) -> RepositoryStats; } @@ -144,13 +145,13 @@ pub trait Timeline: Send + Sync { /// but it can be explicitly requested through page server API. /// /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). - /// `compact` parameter is used to force compaction of storage. - /// Some storage implementation are based on LSM tree and require periodic merge (compaction). - /// Usually storage implementation determines itself when compaction should be performed. - /// But for GC tests it way be useful to force compaction just after completion of GC iteration - /// to make sure that all detected garbage is removed. - /// So right now `compact` is set to true when GC explicitly requested through page srver API, - /// and is st to false in GC threads which infinitely repeats GC iterations in loop. + /// `compact` parameter is used to force compaction of storage. + /// Some storage implementation are based on LSM tree and require periodic merge (compaction). + /// Usually storage implementation determines itself when compaction should be performed. + /// But for GC tests it way be useful to force compaction just after completion of GC iteration + /// to make sure that all detected garbage is removed. + /// So right now `compact` is set to true when GC explicitly requested through page srver API, + /// and is st to false in GC threads which infinitely repeats GC iterations in loop. fn gc_iteration(&self, horizon: u64, compact: bool) -> Result; // Check transaction status @@ -311,7 +312,7 @@ mod tests { use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; - use crate::PageServerConf; + use crate::{PageServerConf, ZTenantId}; use postgres_ffi::pg_constants; use std::fs; use std::path::PathBuf; @@ -356,7 +357,7 @@ mod tests { fn get_test_repo(test_name: &str) -> Result> { let repo_dir = PathBuf::from(format!("../tmp_check/test_{}", test_name)); let _ = fs::remove_dir_all(&repo_dir); - fs::create_dir_all(&repo_dir)?; + fs::create_dir_all(&repo_dir).unwrap(); let conf = PageServerConf { daemonize: false, @@ -371,12 +372,15 @@ mod tests { // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + let tenantid = ZTenantId::generate(); + fs::create_dir_all(conf.tenant_path(&tenantid)).unwrap(); - let obj_store = RocksObjectStore::create(conf)?; + let obj_store = RocksObjectStore::create(conf, &tenantid)?; let walredo_mgr = TestRedoManager {}; - let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); + let repo = + ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); Ok(Box::new(repo)) } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index ad5e7cb298..61e839d5db 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -18,6 +18,7 @@ use crate::object_key::*; use crate::repository::*; use crate::waldecoder::*; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; @@ -30,9 +31,12 @@ const MAX_MBR_BLKNO: u32 = /// /// Find latest snapshot in a timeline's 'snapshots' directory /// -pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Result { - let snapshotspath = format!("timelines/{}/snapshots", timeline); - +pub fn find_latest_snapshot( + conf: &PageServerConf, + timelineid: &ZTimelineId, + tenantid: &ZTenantId, +) -> Result { + let snapshotspath = conf.snapshots_path(timelineid, tenantid); let mut last_snapshot_lsn = Lsn(0); for direntry in fs::read_dir(&snapshotspath).unwrap() { let filename = direntry.unwrap().file_name(); @@ -45,7 +49,10 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re } if last_snapshot_lsn == Lsn(0) { - error!("could not find valid snapshot in {}", &snapshotspath); + error!( + "could not find valid snapshot in {}", + snapshotspath.display() + ); // TODO return error? } Ok(last_snapshot_lsn) diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index 3164def5a9..f13b478d05 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -5,6 +5,7 @@ use crate::object_key::*; use crate::object_store::ObjectStore; use crate::repository::RelTag; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -241,26 +242,30 @@ impl ObjectStore for RocksObjectStore { impl RocksObjectStore { /// Open a RocksDB database. - pub fn open(conf: &'static PageServerConf) -> Result { + pub fn open(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { let opts = Self::get_rocksdb_opts(); - let obj_store = Self::new(conf, opts)?; + let obj_store = Self::new(conf, opts, tenantid)?; Ok(obj_store) } /// Create a new, empty RocksDB database. - pub fn create(conf: &'static PageServerConf) -> Result { - let path = conf.workdir.join("rocksdb-storage"); + pub fn create(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { + let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); std::fs::create_dir(&path)?; let mut opts = Self::get_rocksdb_opts(); opts.create_if_missing(true); opts.set_error_if_exists(true); - let obj_store = Self::new(conf, opts)?; + let obj_store = Self::new(conf, opts, tenantid)?; Ok(obj_store) } - fn new(conf: &'static PageServerConf, mut opts: rocksdb::Options) -> Result { - let path = conf.workdir.join("rocksdb-storage"); + fn new( + conf: &'static PageServerConf, + mut opts: rocksdb::Options, + tenantid: &ZTenantId, + ) -> Result { + let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); let gc = Arc::new(GarbageCollector::new()); let gc_ref = gc.clone(); opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], _val: &[u8]| { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 52c5f0e470..abd3218e86 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -9,6 +9,7 @@ use crate::page_cache; use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use anyhow::{Error, Result}; use lazy_static::lazy_static; @@ -25,7 +26,6 @@ use std::collections::HashMap; use std::fs; use std::fs::{File, OpenOptions}; use std::io::{Seek, SeekFrom, Write}; -use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; use std::thread; @@ -50,6 +50,7 @@ pub fn launch_wal_receiver( conf: &'static PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, + tenantid: ZTenantId, ) { let mut receivers = WAL_RECEIVERS.lock().unwrap(); @@ -67,7 +68,7 @@ pub fn launch_wal_receiver( let _walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { - thread_main(conf, timelineid); + thread_main(conf, timelineid, &tenantid); }) .unwrap(); } @@ -88,7 +89,7 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { // // This is the entry point for the WAL receiver thread. // -fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { +fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: &ZTenantId) { info!( "WAL receiver thread started for timeline : '{}'", timelineid @@ -102,7 +103,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { // Look up the current WAL producer address let wal_producer_connstr = get_wal_producer_connstr(timelineid); - let res = walreceiver_main(conf, timelineid, &wal_producer_connstr); + let res = walreceiver_main(conf, timelineid, &wal_producer_connstr, tenantid); if let Err(e) = res { info!( @@ -115,9 +116,10 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { } fn walreceiver_main( - _conf: &PageServerConf, + conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, + tenantid: &ZTenantId, ) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); @@ -134,7 +136,7 @@ fn walreceiver_main( let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(tenantid)?; let timeline = repository.get_timeline(timelineid).unwrap(); // @@ -183,7 +185,14 @@ fn walreceiver_main( let endlsn = startlsn + data.len() as u64; let prev_last_rec_lsn = last_rec_lsn; - write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?; + write_wal_file( + conf, + startlsn, + &timelineid, + pg_constants::WAL_SEGMENT_SIZE, + data, + tenantid, + )?; trace!("received XLogData between {} and {}", startlsn, endlsn); @@ -237,9 +246,11 @@ fn walreceiver_main( { info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn); let (oldest_segno, newest_segno) = find_wal_file_range( - timelineid, + conf, + &timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn, + tenantid, )?; if newest_segno - oldest_segno >= 10 { @@ -296,16 +307,18 @@ fn walreceiver_main( } fn find_wal_file_range( - timeline: ZTimelineId, + conf: &PageServerConf, + timeline: &ZTimelineId, wal_seg_size: usize, written_upto: Lsn, + tenant: &ZTenantId, ) -> Result<(u64, u64)> { let written_upto_segno = written_upto.segment_number(wal_seg_size); let mut oldest_segno = written_upto_segno; let mut newest_segno = written_upto_segno; // Scan the wal directory, and count how many WAL filed we could remove - let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); + let wal_dir = conf.wal_dir_path(timeline, tenant); for entry in fs::read_dir(wal_dir)? { let entry = entry?; let path = entry.path(); @@ -382,10 +395,12 @@ pub fn identify_system(client: &mut Client) -> Result { } fn write_wal_file( + conf: &PageServerConf, startpos: Lsn, - timeline: ZTimelineId, + timelineid: &ZTimelineId, wal_seg_size: usize, buf: &[u8], + tenantid: &ZTenantId, ) -> anyhow::Result<()> { let mut bytes_left: usize = buf.len(); let mut bytes_written: usize = 0; @@ -393,7 +408,7 @@ fn write_wal_file( let mut start_pos = startpos; const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); + let wal_dir = conf.wal_dir_path(timelineid, tenantid); /* Extract WAL location for this block */ let mut xlogoff = start_pos.segment_offset(wal_seg_size); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e43b9c354a..102c8eb347 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -42,6 +42,7 @@ use crate::repository::WALRecord; use crate::waldecoder::XlXactParsedRecord; use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; +use crate::ZTenantId; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; @@ -100,6 +101,8 @@ struct PostgresRedoManagerInternal { conf: &'static PageServerConf, request_rx: mpsc::Receiver, + + tenantid: ZTenantId, } #[derive(Debug)] @@ -131,7 +134,7 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// /// This launches a new thread to handle the requests. - pub fn new(conf: &'static PageServerConf) -> PostgresRedoManager { + pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { let (tx, rx) = mpsc::channel(); // @@ -146,7 +149,11 @@ impl PostgresRedoManager { let _walredo_thread = std::thread::Builder::new() .name("WAL redo thread".into()) .spawn(move || { - let mut internal = PostgresRedoManagerInternal { conf, request_rx }; + let mut internal = PostgresRedoManagerInternal { + conf, + request_rx, + tenantid, + }; internal.wal_redo_main(); }) .unwrap(); @@ -219,7 +226,7 @@ impl PostgresRedoManagerInternal { // Main entry point for the WAL applicator thread. // fn wal_redo_main(&mut self) { - info!("WAL redo thread started"); + info!("WAL redo thread started for tenant: {}", self.tenantid); // We block on waiting for requests on the walredo request channel, but // use async I/O to communicate with the child process. Initialize the @@ -231,10 +238,13 @@ impl PostgresRedoManagerInternal { let process: PostgresRedoProcess; - info!("launching WAL redo postgres process"); + info!( + "launching WAL redo postgres process for tenant: {}", + self.tenantid + ); process = runtime - .block_on(PostgresRedoProcess::launch(self.conf)) + .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid)) .unwrap(); // Loop forever, handling requests as they come. @@ -454,11 +464,14 @@ impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch(conf: &PageServerConf) -> Result { + async fn launch( + conf: &PageServerConf, + tenantid: &ZTenantId, + ) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. - let datadir = conf.workdir.join("wal-redo-datadir"); + let datadir = conf.tenant_path(&tenantid).join("wal-redo-datadir"); // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index bcb66cb1f3..6b9e91a258 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -1,10 +1,13 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + + pytest_plugins = ("fixtures.zenith_fixtures") # # Create a couple of branches off the main branch, at a historical point in time. # -def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin): +def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): # Branch at the point where only 100 rows were inserted zenith_cli.run(["branch", "test_branch_behind", "empty"]) diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index 00c27b1ac1..d8cc798839 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -1,12 +1,14 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") # # Test starting Postgres with custom options # -def test_config(zenith_cli, pageserver, postgres, pg_bin): +def test_config(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): # Create a branch for us zenith_cli.run(["branch", "test_config", "empty"]) diff --git a/test_runner/batch_others/test_createdb.py b/test_runner/batch_others/test_createdb.py index bfc2224e05..7e582ccb3d 100644 --- a/test_runner/batch_others/test_createdb.py +++ b/test_runner/batch_others/test_createdb.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, ZenithCli pytest_plugins = ("fixtures.zenith_fixtures") @@ -6,7 +7,12 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE DATABASE when there have been relmapper changes # -def test_createdb(zenith_cli, pageserver, postgres, pg_bin): +def test_createdb( + zenith_cli: ZenithCli, + pageserver: ZenithPageserver, + postgres: PostgresFactory, + pg_bin, +): zenith_cli.run(["branch", "test_createdb", "empty"]) pg = postgres.create_start('test_createdb') diff --git a/test_runner/batch_others/test_createuser.py b/test_runner/batch_others/test_createuser.py index b821a233d1..f44df91c3c 100644 --- a/test_runner/batch_others/test_createuser.py +++ b/test_runner/batch_others/test_createuser.py @@ -1,12 +1,14 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE USER to check shared catalog restore # -def test_createuser(zenith_cli, pageserver, postgres, pg_bin): +def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_createuser", "empty"]) pg = postgres.create_start('test_createuser') diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index 116e628c8f..12076a8418 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver import psycopg2.extras pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,7 +10,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # This test is pretty tightly coupled with the current implementation of page version storage # and garbage collection in object_repository.rs. # -def test_gc(zenith_cli, pageserver, postgres, pg_bin): +def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_gc", "empty"]) pg = postgres.create_start('test_gc') @@ -30,7 +31,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): # before running the actual tests below, otherwise the counts won't match # what we expect. print("Running GC before test") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) # remember the number of relations @@ -42,7 +43,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): n_relations += 1; print("Inserting one row and running GC") cur.execute("INSERT INTO foo VALUES (1)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -55,7 +56,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES (2)") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -68,7 +69,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("Inserting one more row") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -77,7 +78,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): assert row['deleted'] == 1 # Run GC again, with no changes in the database. Should not remove anything. - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -90,7 +91,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): # cur.execute("DROP TABLE foo") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) # Each relation fork is counted separately, hence 3. diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index f64be25bcf..49c6e4aa66 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -1,3 +1,5 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") @@ -7,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # it only checks next_multixact_id field in restored pg_control, # since we don't have functions to check multixact internals. # -def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir): +def test_multixact(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, base_dir): # Create a branch for us zenith_cli.run(["branch", "test_multixact", "empty"]) pg = postgres.create_start('test_multixact') diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index e48c3b105d..4bc0c36a08 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -1,4 +1,8 @@ import json +import uuid +import pytest +import psycopg2 +from fixtures.zenith_fixtures import ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,14 +13,14 @@ def test_status(pageserver): ] -def test_branch_list(pageserver, zenith_cli): +def test_branch_list(pageserver: ZenithPageserver, zenith_cli): # Create a branch for us zenith_cli.run(["branch", "test_branch_list_main", "empty"]) conn = pageserver.connect() cur = conn.cursor() - cur.execute('branch_list') + cur.execute(f'branch_list {pageserver.initial_tenant}') branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests branches = [x for x in branches if x['name'].startswith('test_branch_list')] @@ -32,7 +36,7 @@ def test_branch_list(pageserver, zenith_cli): zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main']) zenith_cli.run(['pg', 'create', 'test_branch_list_experimental']) - cur.execute('branch_list') + cur.execute(f'branch_list {pageserver.initial_tenant}') new_branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')] @@ -46,3 +50,27 @@ def test_branch_list(pageserver, zenith_cli): assert new_branches[1] == branches[0] conn.close() + + +def test_tenant_list(pageserver: ZenithPageserver, zenith_cli): + res = zenith_cli.run(["tenant", "list"]) + res.check_returncode() + tenants = res.stdout.splitlines() + assert tenants == [pageserver.initial_tenant] + + conn = pageserver.connect() + cur = conn.cursor() + + # check same tenant cannot be created twice + with pytest.raises(psycopg2.DatabaseError, match=f'repo for {pageserver.initial_tenant} already exists'): + cur.execute(f'tenant_create {pageserver.initial_tenant}') + + # create one more tenant + tenant1 = uuid.uuid4().hex + cur.execute(f'tenant_create {tenant1}') + + cur.execute('tenant_list') + + # compare tenants list + new_tenants = sorted(json.loads(cur.fetchone()[0])) + assert sorted([pageserver.initial_tenant, tenant1]) == new_tenants diff --git a/test_runner/batch_others/test_pgbench.py b/test_runner/batch_others/test_pgbench.py index 486dd77496..a5423cf3d7 100644 --- a/test_runner/batch_others/test_pgbench.py +++ b/test_runner/batch_others/test_pgbench.py @@ -1,8 +1,9 @@ +from fixtures.zenith_fixtures import PostgresFactory + pytest_plugins = ("fixtures.zenith_fixtures") -def test_pgbench(pageserver, postgres, pg_bin, zenith_cli): - +def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli): # Create a branch for us zenith_cli.run(["branch", "test_pgbench", "empty"]) diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index dc3be94bfe..464d3ed3d7 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") @@ -6,7 +7,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test restarting and recreating a postgres instance # -def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): +def test_restart_compute(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_restart_compute", "empty"]) pg = postgres.create_start('test_restart_compute') diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py new file mode 100644 index 0000000000..ee6bb0bfd3 --- /dev/null +++ b/test_runner/batch_others/test_tenants.py @@ -0,0 +1,48 @@ +from contextlib import closing + +import pytest + +from fixtures.zenith_fixtures import ( + TenantFactory, + ZenithCli, + PostgresFactory, +) + + +@pytest.mark.parametrize('with_wal_acceptors', [False, True]) +def test_tenants_normal_work( + zenith_cli: ZenithCli, + tenant_factory: TenantFactory, + postgres: PostgresFactory, + wa_factory, + with_wal_acceptors: bool, +): + """Tests tenants with and without wal acceptors""" + tenant_1 = tenant_factory.create() + tenant_2 = tenant_factory.create() + + zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_1}"]) + zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_2}"]) + if with_wal_acceptors: + wa_factory.start_n_new(3) + + pg_tenant1 = postgres.create_start( + f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", + tenant_1, + wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, + ) + pg_tenant2 = postgres.create_start( + f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", + tenant_2, + wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, + ) + + for pg in [pg_tenant1, pg_tenant2]: + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute("CREATE TABLE t(key int primary key, value text)") + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute("SELECT sum(key) FROM t") + assert cur.fetchone() == (5000050000,) diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index d4ffb9dc6c..ab1cdf3001 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,10 +1,13 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + + pytest_plugins = ("fixtures.zenith_fixtures") # # Test branching, when a transaction is in prepared state # -def test_twophase(zenith_cli, pageserver, postgres, pg_bin): +def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_twophase", "empty"]) pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) @@ -28,8 +31,10 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin): # Create a branch with the transaction in prepared state zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) - pg2 = postgres.create_start('test_twophase_prepared', - config_lines=['max_prepared_transactions=5']) + pg2 = postgres.create_start( + 'test_twophase_prepared', + config_lines=['max_prepared_transactions=5'], + ) conn2 = pg2.connect() cur2 = conn2.cursor() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index bf5a7f19e7..371a740f0f 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -4,13 +4,14 @@ import time from contextlib import closing from multiprocessing import Process, Value +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") # basic test, write something in setup with wal acceptors, ensure that commits # succeed and data is written -def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): +def test_normal_work(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"]) wa_factory.start_n_new(3) pg = postgres.create_start('test_wal_acceptors_normal_work', @@ -28,7 +29,7 @@ def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): # Run page server and multiple acceptors, and multiple compute nodes running # against different timelines. -def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory): +def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): n_timelines = 2 wa_factory.start_n_new(3) @@ -60,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory): # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up # along the way. 2 of 3 are always alive, so the work keeps going. -def test_restarts(zenith_cli, pageserver, postgres, wa_factory): +def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): fault_probability = 0.01 n_inserts = 1000 n_acceptors = 3 @@ -101,7 +102,7 @@ def delayed_wal_acceptor_start(wa): # When majority of acceptors is offline, commits are expected to be frozen -def test_unavailability(zenith_cli, pageserver, postgres, wa_factory): +def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory): wa_factory.start_n_new(2) zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"]) @@ -171,7 +172,7 @@ def stop_value(): # do inserts while concurrently getting up/down subsets of acceptors -def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_value): +def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory, stop_value): wa_factory.start_n_new(3) diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 987a38753a..be9e2b07fd 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -1,43 +1,50 @@ import json +import uuid + +from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") -def helper_compare_branch_list(page_server_cur, zenith_cli): +def helper_compare_branch_list(page_server_cur, zenith_cli, initial_tenant: str): """ Compare branches list returned by CLI and directly via API. Filters out branches created by other tests. """ - page_server_cur.execute('branch_list') + page_server_cur.execute(f'branch_list {initial_tenant}') branches_api = sorted(map(lambda b: b['name'], json.loads(page_server_cur.fetchone()[0]))) branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')] res = zenith_cli.run(["branch"]) - assert res.stderr == '' + res.check_returncode() branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] - assert branches_api == branches_cli + res = zenith_cli.run(["branch", f"--tenantid={initial_tenant}"]) + res.check_returncode() + branches_cli_with_tenant_arg = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) + branches_cli_with_tenant_arg = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] + + assert branches_api == branches_cli == branches_cli_with_tenant_arg -def test_cli_branch_list(pageserver, zenith_cli): - +def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli): page_server_conn = pageserver.connect() page_server_cur = page_server_conn.cursor() # Initial sanity check - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Create a branch for us res = zenith_cli.run(["branch", "test_cli_branch_list_main", "main"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Create a nested branch res = zenith_cli.run(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Check that all new branches are visible via CLI res = zenith_cli.run(["branch"]) @@ -46,3 +53,45 @@ def test_cli_branch_list(pageserver, zenith_cli): assert 'test_cli_branch_list_main' in branches_cli assert 'test_cli_branch_list_nested' in branches_cli + +def helper_compare_tenant_list(page_server_cur, zenith_cli: ZenithCli): + page_server_cur.execute(f'tenant_list') + tenants_api = sorted(json.loads(page_server_cur.fetchone()[0])) + + res = zenith_cli.run(["tenant", "list"]) + assert res.stderr == '' + tenants_cli = sorted(res.stdout.splitlines()) + + assert tenants_api == tenants_cli + + +def test_cli_tenant_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli): + page_server_conn = pageserver.connect() + page_server_cur = page_server_conn.cursor() + + # Initial sanity check + helper_compare_tenant_list(page_server_cur, zenith_cli) + + # Create new tenant + tenant1 = uuid.uuid4().hex + res = zenith_cli.run(["tenant", "create", tenant1]) + res.check_returncode() + + # check tenant1 appeared + helper_compare_tenant_list(page_server_cur, zenith_cli) + + # Create new tenant + tenant2 = uuid.uuid4().hex + res = zenith_cli.run(["tenant", "create", tenant2]) + res.check_returncode() + + # check tenant2 appeared + helper_compare_tenant_list(page_server_cur, zenith_cli) + + res = zenith_cli.run(["tenant", "list"]) + res.check_returncode() + tenants = sorted(res.stdout.splitlines()) + + assert pageserver.initial_tenant in tenants + assert tenant1 in tenants + assert tenant2 in tenants diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index 1bd5aeb0b9..ae654401cc 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") -def test_isolation(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_isolation(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index d119fdaff9..e45f3c199c 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") -def test_pg_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_pg_regress(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index cf77cc41e5..ccda49295b 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") -def test_zenith_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index b17992549a..154ca56b3e 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,5 +1,7 @@ import getpass import os +import pathlib +import uuid import psycopg2 import pytest import shutil @@ -169,16 +171,15 @@ class ZenithPageserver(PgProtocol): """ An object representing a running pageserver. """ def __init__(self, zenith_cli: ZenithCli): super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PORT) - self.zenith_cli = zenith_cli self.running = False + self.initial_tenant = None def init(self) -> 'ZenithPageserver': """ Initialize the repository, i.e. run "zenith init". Returns self. """ - self.zenith_cli.run(['init']) return self @@ -190,6 +191,8 @@ class ZenithPageserver(PgProtocol): self.zenith_cli.run(['start']) self.running = True + # get newly created tenant id + self.initial_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.strip() return self def stop(self) -> 'ZenithPageserver': @@ -232,7 +235,7 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int, tenant_id: str): super().__init__(host='localhost', port=55431 + instance_num) self.zenith_cli = zenith_cli @@ -240,12 +243,15 @@ class Postgres(PgProtocol): self.running = False self.repo_dir = repo_dir self.branch: Optional[str] = None # dubious, see asserts below - # path to conf is /pgdatadirs//postgresql.conf + self.tenant_id = tenant_id + # path to conf is /pgdatadirs/tenants///postgresql.conf - def create(self, - branch: str, - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> 'Postgres': + def create( + self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None, + ) -> 'Postgres': """ Create the pg data directory. If wal_acceptors is not None, node will use wal acceptors; config is @@ -256,7 +262,7 @@ class Postgres(PgProtocol): if not config_lines: config_lines = [] - self.zenith_cli.run(['pg', 'create', branch]) + self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}']) self.branch = branch if wal_acceptors is not None: self.adjust_for_wal_acceptors(wal_acceptors) @@ -273,14 +279,14 @@ class Postgres(PgProtocol): """ assert self.branch is not None - self.zenith_cli.run(['pg', 'start', self.branch]) + self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.running = True return self def config_file_path(self) -> str: """ Path to postgresql.conf """ - filename = f'pgdatadirs/{self.branch}/postgresql.conf' + filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf' return os.path.join(self.repo_dir, filename) def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': @@ -326,7 +332,7 @@ class Postgres(PgProtocol): if self.running: assert self.branch is not None - self.zenith_cli.run(['pg', 'stop', self.branch]) + self.zenith_cli.run(['pg', 'stop', self.branch, f'--tenantid={self.tenant_id}']) self.running = False return self @@ -338,42 +344,57 @@ class Postgres(PgProtocol): """ assert self.branch is not None - self.zenith_cli.run(['pg', 'stop', '--destroy', self.branch]) + assert self.tenant_id is not None + self.zenith_cli.run(['pg', 'stop', '--destroy', self.branch, f'--tenantid={self.tenant_id}']) return self - def create_start(self, - branch: str, - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> 'Postgres': + def create_start( + self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None, + ) -> 'Postgres': """ Create a Postgres instance, then start it. Returns self. """ - self.create(branch, wal_acceptors, config_lines).start() + self.create( + branch=branch, + wal_acceptors=wal_acceptors, + config_lines=config_lines, + ).start() return self class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] + self.initial_tenant: str = initial_tenant - def create_start(self, - branch: str = "main", - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> Postgres: + def create_start( + self, + branch: str = "main", + tenant_id: Optional[str] = None, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None + ) -> Postgres: - pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1) + pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1, tenant_id=tenant_id or self.initial_tenant) self.num_instances += 1 self.instances.append(pg) - return pg.create_start(branch, wal_acceptors, config_lines) + return pg.create_start( + branch=branch, + wal_acceptors=wal_acceptors, + config_lines=config_lines, + ) def stop_all(self) -> 'PostgresFactory': for pg in self.instances: @@ -381,10 +402,14 @@ class PostgresFactory: return self +@zenfixture +def initial_tenant(pageserver: ZenithPageserver): + return pageserver.initial_tenant + @zenfixture -def postgres(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory(zenith_cli, repo_dir) +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]: + pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant) yield pgfactory @@ -636,3 +661,20 @@ def pg_distrib_dir(base_dir: str) -> str: if not os.path.exists(os.path.join(pg_dir, 'bin/postgres')): raise Exception('postgres not found at "{}"'.format(pg_dir)) return pg_dir + + +class TenantFactory: + def __init__(self, cli: ZenithCli): + self.cli = cli + + def create(self, tenant_id: Optional[str] = None): + if tenant_id is None: + tenant_id = uuid.uuid4().hex + res = self.cli.run(['tenant', 'create', tenant_id]) + res.check_returncode() + return tenant_id + + +@zenfixture +def tenant_factory(zenith_cli: ZenithCli): + return TenantFactory(zenith_cli) diff --git a/vendor/postgres b/vendor/postgres index 8ab674ad99..4cb9578650 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 8ab674ad9927095eeaf278fc67d6f8ad5e38c9cb +Subproject commit 4cb95786504e2d3cf8539ef901eb88c0615691d0 diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 15626bd360..f12b6263f5 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -21,7 +21,7 @@ use zenith_utils::lsn::Lsn; use crate::replication::HotStandbyFeedback; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use pageserver::ZTimelineId; +use pageserver::{ZTenantId, ZTimelineId}; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; use zenith_utils::pq_proto::SystemId; @@ -52,6 +52,7 @@ pub struct ServerInfo { pub wal_end: Lsn, pub timeline: TimeLineID, pub wal_seg_size: u32, + pub tenant_id: ZTenantId, } /// Vote request sent from proposer to safekeepers @@ -101,6 +102,7 @@ impl SafeKeeperInfo { wal_end: Lsn(0), timeline: 0, wal_seg_size: 0, + tenant_id: ZTenantId::from([0u8; 16]), }, commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ flush_lsn: Lsn(0), /* locally flushed part of WAL */ @@ -149,7 +151,7 @@ pub struct ReceiveWalConn { /// Periodically request pageserver to call back. /// If pageserver already has replication channel, it will just ignore this request /// -fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { +fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) { let ps_addr = conf.pageserver_addr.unwrap(); let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr); @@ -158,9 +160,10 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { let me_conf: Config = me_connstr.parse().unwrap(); let (host, port) = connection_host_port(&me_conf); let callme = format!( - "callmemaybe {} host={} port={} options='-c ztimelineid={}'", - timelineid, host, port, timelineid + "callmemaybe {} {} host={} port={} options='-c ztimelineid={}'", + tenantid, timelineid, host, port, timelineid, ); + loop { info!( "requesting page server to connect to us: start {} {}", @@ -206,8 +209,8 @@ impl ReceiveWalConn { // Receive information about server let server_info = self.read_req::()?; info!( - "Start handshake with wal_proposer {} sysid {} timeline {}", - self.peer_addr, server_info.system_id, server_info.timeline_id, + "Start handshake with wal_proposer {} sysid {} timeline {} tenant {}", + self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id, ); // FIXME: also check that the system identifier matches self.timeline.set(server_info.timeline_id)?; @@ -274,14 +277,15 @@ impl ReceiveWalConn { // Add far as replication in postgres is initiated by receiver, we should use callme mechanism let conf = self.conf.clone(); let timelineid = self.timeline.get().timelineid; + let tenantid = server_info.tenant_id; thread::spawn(move || { - request_callback(conf, timelineid); + request_callback(conf, timelineid, tenantid); }); } info!( - "Start streaming from timeline {} address {:?}", - server_info.timeline_id, self.peer_addr, + "Start streaming from timeline {} tenant {} address {:?}", + server_info.timeline_id, server_info.tenant_id, self.peer_addr, ); // Main loop diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 413114ac1c..4c9c019893 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -4,6 +4,7 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env::{self, LocalEnv}; use control_plane::storage::PageServerNode; +use pageserver::ZTenantId; use std::collections::btree_map::Entry; use std::collections::HashMap; use std::process::exit; @@ -33,9 +34,15 @@ fn main() -> Result<()> { let timeline_arg = Arg::with_name("timeline") .short("n") .index(1) - .help("timeline name") + .help("Timeline name") .required(true); + let tenantid_arg = Arg::with_name("tenantid") + .long("tenantid") + .help("Tenant id. Represented as a hexadecimal string 32 symbols length") + .takes_value(true) + .required(false); + let matches = App::new("Zenith CLI") .setting(AppSettings::ArgRequiredElseHelp) .subcommand( @@ -52,7 +59,14 @@ fn main() -> Result<()> { SubCommand::with_name("branch") .about("Create a new branch") .arg(Arg::with_name("branchname").required(false).index(1)) - .arg(Arg::with_name("start-point").required(false).index(2)), + .arg(Arg::with_name("start-point").required(false).index(2)) + .arg(tenantid_arg.clone()), + ).subcommand( + SubCommand::with_name("tenant") + .setting(AppSettings::ArgRequiredElseHelp) + .about("Manage tenants") + .subcommand(SubCommand::with_name("list")) + .subcommand(SubCommand::with_name("create").arg(Arg::with_name("tenantid").required(false).index(1))) ) .subcommand(SubCommand::with_name("status")) .subcommand(SubCommand::with_name("start").about("Start local pageserver")) @@ -62,12 +76,13 @@ fn main() -> Result<()> { SubCommand::with_name("pg") .setting(AppSettings::ArgRequiredElseHelp) .about("Manage postgres instances") - .subcommand(SubCommand::with_name("list")) - .subcommand(SubCommand::with_name("create").arg(timeline_arg.clone())) - .subcommand(SubCommand::with_name("start").arg(timeline_arg.clone())) + .subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone())) + .subcommand(SubCommand::with_name("create").arg(timeline_arg.clone()).arg(tenantid_arg.clone())) + .subcommand(SubCommand::with_name("start").arg(timeline_arg.clone()).arg(tenantid_arg.clone())) .subcommand( SubCommand::with_name("stop") .arg(timeline_arg.clone()) + .arg(tenantid_arg.clone()) .arg( Arg::with_name("destroy") .help("Also delete data directory (now optional, should be default in future)") @@ -101,8 +116,10 @@ fn main() -> Result<()> { // Create config file if let ("init", Some(sub_args)) = matches.subcommand() { + let tenantid = ZTenantId::generate(); let pageserver_uri = sub_args.value_of("pageserver-url"); - local_env::init(pageserver_uri).with_context(|| "Failed to create config file")?; + local_env::init(pageserver_uri, tenantid) + .with_context(|| "Failed to create config file")?; } // all other commands would need config @@ -117,11 +134,17 @@ fn main() -> Result<()> { match matches.subcommand() { ("init", Some(_)) => { let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.init() { + if let Err(e) = pageserver.init(Some(&env.tenantid.to_string())) { eprintln!("pageserver init failed: {}", e); exit(1); } } + ("tenant", Some(args)) => { + if let Err(e) = handle_tenant(args, &env) { + eprintln!("tenant command failed: {}", e); + exit(1); + } + } ("branch", Some(sub_args)) => { if let Err(e) = handle_branch(sub_args, &env) { @@ -308,9 +331,12 @@ fn print_branch( /// Returns a map of timeline IDs to branch_name@lsn strings. /// Connects to the pageserver to query this information. -fn get_branch_infos(env: &local_env::LocalEnv) -> Result> { +fn get_branch_infos( + env: &local_env::LocalEnv, + tenantid: &ZTenantId, +) -> Result> { let page_server = PageServerNode::from_env(env); - let branch_infos: Vec = page_server.branches_list()?; + let branch_infos: Vec = page_server.branches_list(tenantid)?; let branch_infos: HashMap = branch_infos .into_iter() .map(|branch_info| (branch_info.timeline_id, branch_info)) @@ -319,23 +345,51 @@ fn get_branch_infos(env: &local_env::LocalEnv) -> Result Result<()> { + let pageserver = PageServerNode::from_env(&env); + match tenant_match.subcommand() { + ("list", Some(_)) => { + for tenant in pageserver.tenants_list()? { + println!("{}", tenant); + } + } + ("create", Some(create_match)) => { + let tenantid = match create_match.value_of("tenantid") { + Some(tenantid) => ZTenantId::from_str(tenantid)?, + None => ZTenantId::generate(), + }; + println!("using tenant id {}", tenantid); + pageserver.tenant_create(&tenantid)?; + println!("tenant successfully created on the pageserver"); + } + _ => {} + } + Ok(()) +} + fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(&env); if let Some(branchname) = branch_match.value_of("branchname") { - if let Some(startpoint_str) = branch_match.value_of("start-point") { - let branch = pageserver.branch_create(branchname, startpoint_str)?; - println!( - "Created branch '{}' at {:?}", - branch.name, - branch.latest_valid_lsn.unwrap_or(Lsn(0)) - ); - } else { - bail!("Missing start-point"); - } + let startpoint_str = branch_match + .value_of("start-point") + .ok_or(anyhow!("Missing start-point"))?; + let tenantid: ZTenantId = branch_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?; + println!( + "Created branch '{}' at {:?} for tenant: {}", + branch.name, + branch.latest_valid_lsn.unwrap_or(Lsn(0)), + tenantid, + ); } else { - // No arguments, list branches - let branches = pageserver.branches_list()?; + let tenantid: ZTenantId = branch_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + // No arguments, list branches for tenant + let branches = pageserver.branches_list(&tenantid)?; print_branches_tree(branches)?; } @@ -346,14 +400,21 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let mut cplane = ComputeControlPlane::load(env.clone())?; match pg_match.subcommand() { - ("list", Some(_sub_m)) => { - let branch_infos = get_branch_infos(env).unwrap_or_else(|e| { + ("list", Some(list_match)) => { + let tenantid: ZTenantId = list_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| { eprintln!("Failed to load branch info: {}", e); HashMap::new() }); println!("BRANCH\tADDRESS\t\tLSN\t\tSTATUS"); - for (timeline_name, node) in cplane.nodes.iter() { + for ((_, timeline_name), node) in cplane + .nodes + .iter() + .filter(|((node_tenantid, _), _)| node_tenantid == &tenantid) + { println!( "{}\t{}\t{}\t{}", timeline_name, @@ -368,30 +429,42 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { ); } } - ("create", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); - cplane.new_node(timeline_name)?; - } - ("start", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); + ("create", Some(create_match)) => { + let tenantid: ZTenantId = create_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let timeline_name = create_match.value_of("timeline").unwrap_or("main"); + // check is that timeline doesnt already exist + // this check here is because it - let node = cplane.nodes.get(timeline_name); + cplane.new_node(tenantid, timeline_name)?; + } + ("start", Some(start_match)) => { + let tenantid: ZTenantId = start_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let timeline_name = start_match.value_of("timeline").unwrap_or("main"); + + let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned())); println!("Starting postgres on timeline {}...", timeline_name); if let Some(node) = node { node.start()?; } else { - let node = cplane.new_node(timeline_name)?; + let node = cplane.new_node(tenantid, timeline_name)?; node.start()?; } } - ("stop", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); - let destroy = sub_m.is_present("destroy"); + ("stop", Some(stop_match)) => { + let timeline_name = stop_match.value_of("timeline").unwrap_or("main"); + let destroy = stop_match.is_present("destroy"); + let tenantid: ZTenantId = stop_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; let node = cplane .nodes - .get(timeline_name) + .get(&(tenantid, timeline_name.to_owned())) .ok_or_else(|| anyhow!("postgres {} is not found", timeline_name))?; node.stop(destroy)?; }