From 767590bbd5d161164aaa7853a0857f43bda190e2 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 15 Jul 2021 16:30:18 +0300 Subject: [PATCH] support tenants this patch adds support for tenants. This touches mostly pageserver. Directory layout on disk is changed to contain new layer of indirection. Now path to particular repository has the following structure: /tenants/. Tenant id has the same format as timeline id. Tenant id is included in pageserver commands when needed. Also new commands are available in pageserver: tenant_list, tenant_create. This is also reflected CLI. During init default tenant is created and it's id is saved in CLI config, so following commands can use it without extra options. Tenant id is also included in compute postgres configuration, so it can be passed via ServerInfo to safekeeper and in connection string to pageserver. For more info see docs/multitenancy.md. --- Cargo.lock | 4 + README.md | 7 +- control_plane/Cargo.toml | 1 + control_plane/src/compute.rs | 92 ++++++-- control_plane/src/local_env.rs | 25 +- control_plane/src/storage.rs | 72 ++++-- docs/multitenancy.md | 59 +++++ pageserver/src/basebackup.rs | 15 +- pageserver/src/bin/pageserver.rs | 19 +- pageserver/src/branches.rs | 163 ++++++++----- pageserver/src/lib.rs | 192 ++++++++++++--- pageserver/src/logger.rs | 5 +- pageserver/src/object_repository.rs | 7 +- pageserver/src/page_cache.rs | 44 ++-- pageserver/src/page_service.rs | 220 +++++++++++------- pageserver/src/repository.rs | 26 ++- pageserver/src/restore_local_repo.rs | 15 +- pageserver/src/rocksdb_storage.rs | 19 +- pageserver/src/walreceiver.rs | 39 +++- pageserver/src/walredo.rs | 27 ++- .../batch_others/test_branch_behind.py | 5 +- test_runner/batch_others/test_config.py | 4 +- test_runner/batch_others/test_createdb.py | 8 +- test_runner/batch_others/test_createuser.py | 4 +- test_runner/batch_others/test_gc.py | 15 +- test_runner/batch_others/test_multixact.py | 4 +- .../batch_others/test_pageserver_api.py | 34 ++- test_runner/batch_others/test_pgbench.py | 5 +- .../batch_others/test_restart_compute.py | 3 +- test_runner/batch_others/test_tenants.py | 48 ++++ test_runner/batch_others/test_twophase.py | 11 +- test_runner/batch_others/test_wal_acceptor.py | 11 +- test_runner/batch_others/test_zenith_cli.py | 67 +++++- .../batch_pg_regress/test_isolation.py | 3 +- .../batch_pg_regress/test_pg_regress.py | 3 +- .../batch_pg_regress/test_zenith_regress.py | 3 +- test_runner/fixtures/zenith_fixtures.py | 96 +++++--- vendor/postgres | 2 +- walkeeper/src/receive_wal.rs | 22 +- zenith/src/main.rs | 145 +++++++++--- 40 files changed, 1145 insertions(+), 399 deletions(-) create mode 100644 docs/multitenancy.md create mode 100644 test_runner/batch_others/test_tenants.py diff --git a/Cargo.lock b/Cargo.lock index e65936c6af..9c6e5ba00c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "hex", "lazy_static", "nix", "pageserver", @@ -674,6 +675,9 @@ name = "hex" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +dependencies = [ + "serde", +] [[package]] name = "hex-literal" diff --git a/README.md b/README.md index 0894dc1f1a..07054b2e7e 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ pip install pytest psycopg2 2. Build zenith and patched postgres ```sh -git clone --recursive https://github.com/libzenith/zenith.git +git clone --recursive https://github.com/zenithdb/zenith.git cd zenith make -j5 ``` @@ -34,8 +34,7 @@ make -j5 # Create repository in .zenith with proper paths to binaries and data # Later that would be responsibility of a package install script > ./target/debug/zenith init -<...> -new zenith repository was created in .zenith +pageserver init succeeded # start pageserver > ./target/debug/zenith start @@ -87,7 +86,7 @@ waiting for server to start.... done # but all modifications would not affect data in original postgres > psql -p55433 -h 127.0.0.1 postgres postgres=# select * from t; - key | value + key | value -----+------- 1 | 1 (1 row) diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 1a73736b4e..520a694877 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1.0" bytes = "1.0.1" nix = "0.20" url = "2.2.2" +hex = { version = "0.4.3", features = ["serde"] } pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a7059e3c3f..5122f205f2 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -17,7 +17,7 @@ use regex::Regex; use zenith_utils::connstring::connection_host_port; use crate::local_env::LocalEnv; -use pageserver::ZTimelineId; +use pageserver::{ZTenantId, ZTimelineId}; use crate::storage::PageServerNode; @@ -27,27 +27,36 @@ use crate::storage::PageServerNode; pub struct ComputeControlPlane { base_port: u16, pageserver: Arc, - pub nodes: BTreeMap>, + pub nodes: BTreeMap<(ZTenantId, String), Arc>, env: LocalEnv, } impl ComputeControlPlane { // Load current nodes with ports from data directories on disk + // Directory structure has the following layout: + // pgdatadirs + // |- tenants + // | |- + // | | |- pub fn load(env: LocalEnv) -> Result { // TODO: since pageserver do not have config file yet we believe here that // it is running on default port. Change that when pageserver will have config. let pageserver = Arc::new(PageServerNode::from_env(&env)); + let mut nodes = BTreeMap::default(); let pgdatadirspath = &env.pg_data_dirs_path(); - let nodes: Result> = fs::read_dir(&pgdatadirspath) + + for tenant_dir in fs::read_dir(&pgdatadirspath) .with_context(|| format!("failed to list {}", pgdatadirspath.display()))? - .into_iter() - .map(|f| { - PostgresNode::from_dir_entry(f?, &env, &pageserver) - .map(|node| (node.name.clone(), Arc::new(node))) - }) - .collect(); - let nodes = nodes?; + { + let tenant_dir = tenant_dir?; + for timeline_dir in fs::read_dir(tenant_dir.path()) + .with_context(|| format!("failed to list {}", tenant_dir.path().display()))? + { + let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?; + nodes.insert((node.tenantid, node.name.clone()), Arc::new(node)); + } + } Ok(ComputeControlPlane { base_port: 55431, @@ -82,6 +91,7 @@ impl ComputeControlPlane { is_test: bool, timelineid: ZTimelineId, name: &str, + tenantid: ZTenantId, ) -> Result> { let node = Arc::new(PostgresNode { name: name.to_owned(), @@ -90,19 +100,26 @@ impl ComputeControlPlane { pageserver: Arc::clone(&self.pageserver), is_test, timelineid, + tenantid, }); node.init_from_page_server()?; - self.nodes.insert(node.name.clone(), Arc::clone(&node)); + self.nodes + .insert((tenantid, node.name.clone()), Arc::clone(&node)); Ok(node) } - pub fn new_node(&mut self, branch_name: &str) -> Result> { - let timeline_id = self.pageserver.branch_get_by_name(branch_name)?.timeline_id; - - let node = self.new_from_page_server(false, timeline_id, branch_name)?; - + pub fn new_node( + &mut self, + tenantid: ZTenantId, + branch_name: &str, + ) -> Result> { + let timeline_id = self + .pageserver + .branch_get_by_name(&tenantid, branch_name)? + .timeline_id; + let node = self.new_from_page_server(false, timeline_id, branch_name, tenantid)?; // Configure the node to stream WAL directly to the pageserver node.append_conf( "postgresql.conf", @@ -112,7 +129,7 @@ impl ComputeControlPlane { "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping ), - node.connstr() + node.connstr(), ) .as_str(), )?; @@ -123,6 +140,7 @@ impl ComputeControlPlane { /////////////////////////////////////////////////////////////////////////////// +#[derive(Debug)] pub struct PostgresNode { pub address: SocketAddr, name: String, @@ -130,6 +148,7 @@ pub struct PostgresNode { pageserver: Arc, is_test: bool, pub timelineid: ZTimelineId, + pub tenantid: ZTenantId, } impl PostgresNode { @@ -149,6 +168,8 @@ impl PostgresNode { static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap(); static ref CONF_TIMELINE_RE: Regex = Regex::new(r"(?m)^\s*zenith.zenith_timeline\s*=\s*'(\w+)'\s*$").unwrap(); + static ref CONF_TENANT_RE: Regex = + Regex::new(r"(?m)^\s*zenith.zenith_tenant\s*=\s*'(\w+)'\s*$").unwrap(); } // parse data directory name @@ -196,6 +217,22 @@ impl PostgresNode { .parse() .with_context(|| err_msg)?; + // parse tenant + let err_msg = format!( + "failed to find tenant definition in config file {}", + cfg_path.to_str().unwrap() + ); + let tenantid = CONF_TENANT_RE + .captures(config.as_str()) + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? + .iter() + .last() + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? + .as_str() + .parse() + .with_context(|| err_msg)?; + // ok now Ok(PostgresNode { address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -204,6 +241,7 @@ impl PostgresNode { pageserver: Arc::clone(pageserver), is_test: false, timelineid, + tenantid, }) } @@ -223,7 +261,7 @@ impl PostgresNode { fs::remove_dir_all(&pgdata).ok(); } - let sql = format!("basebackup {}", self.timelineid); + let sql = format!("basebackup {} {}", self.tenantid, self.timelineid); let mut client = self .pageserver .page_server_psql_client() @@ -293,12 +331,16 @@ impl PostgresNode { let (host, port) = connection_host_port(&self.pageserver.connection_config()); self.append_conf( "postgresql.conf", - &format!( - "shared_preload_libraries = zenith \n\ - zenith.page_server_connstring = 'host={} port={}'\n\ - zenith.zenith_timeline='{}'\n", - host, port, self.timelineid - ), + format!( + concat!( + "shared_preload_libraries = zenith\n", + "zenith.page_server_connstring = 'host={} port={}'\n", + "zenith.zenith_timeline='{}'\n", + "zenith.zenith_tenant='{}'\n", + ), + host, port, self.timelineid, self.tenantid, + ) + .as_str(), )?; fs::create_dir_all(self.pgdata().join("pg_wal"))?; @@ -307,7 +349,7 @@ impl PostgresNode { } pub fn pgdata(&self) -> PathBuf { - self.env.pg_data_dir(&self.name) + self.env.pg_data_dir(&self.tenantid, &self.name) } pub fn status(&self) -> &str { diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index fabc809bfc..7f25f105bd 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -5,6 +5,8 @@ // script which will use local paths. // use anyhow::{anyhow, Result}; +use hex; +use pageserver::ZTenantId; use serde::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; @@ -16,7 +18,7 @@ pub type Remotes = BTreeMap; // // This data structures represent deserialized zenith CLI config // -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct LocalEnv { // Pageserver connection strings pub pageserver_connstring: String, @@ -33,6 +35,10 @@ pub struct LocalEnv { // Path to pageserver binary. Empty for remote pageserver. pub zenith_distrib_dir: Option, + // keeping tenant id in config to reduce copy paste when running zenith locally with single tenant + #[serde(with = "hex")] + pub tenantid: ZTenantId, + pub remotes: Remotes, } @@ -54,11 +60,13 @@ impl LocalEnv { } pub fn pg_data_dirs_path(&self) -> PathBuf { - self.base_data_dir.join("pgdatadirs") + self.base_data_dir.join("pgdatadirs").join("tenants") } - pub fn pg_data_dir(&self, name: &str) -> PathBuf { - self.pg_data_dirs_path().join(name) + pub fn pg_data_dir(&self, tenantid: &ZTenantId, branch_name: &str) -> PathBuf { + self.pg_data_dirs_path() + .join(tenantid.to_string()) + .join(branch_name) } // TODO: move pageserver files into ./pageserver @@ -77,7 +85,7 @@ fn base_path() -> PathBuf { // // Initialize a new Zenith repository // -pub fn init(remote_pageserver: Option<&str>) -> Result<()> { +pub fn init(remote_pageserver: Option<&str>, tenantid: ZTenantId) -> Result<()> { // check if config already exists let base_path = base_path(); if base_path.exists() { @@ -102,9 +110,6 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } - fs::create_dir(&base_path)?; - fs::create_dir(base_path.join("pgdatadirs"))?; - let conf = if let Some(addr) = remote_pageserver { // check that addr is parsable let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?; @@ -115,6 +120,7 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { zenith_distrib_dir: None, base_data_dir: base_path, remotes: BTreeMap::default(), + tenantid, } } else { // Find zenith binaries. @@ -129,9 +135,12 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { zenith_distrib_dir: Some(zenith_distrib_dir), base_data_dir: base_path, remotes: BTreeMap::default(), + tenantid, } }; + fs::create_dir_all(conf.pg_data_dirs_path())?; + let toml = toml::to_string_pretty(&conf)?; fs::write(conf.base_data_dir.join("config"), toml)?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3f73ceccc6..d12e8d3366 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -8,6 +8,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; +use pageserver::ZTenantId; use postgres::{Config, NoTls}; use crate::local_env::LocalEnv; @@ -20,6 +21,7 @@ use zenith_utils::connstring::connection_address; // // Used in CLI and tests. // +#[derive(Debug)] pub struct PageServerNode { pub kill_on_exit: bool, pub connection_config: Option, @@ -48,16 +50,18 @@ impl PageServerNode { } } - pub fn init(&self) -> Result<()> { + pub fn init(&self, create_tenant: Option<&str>) -> Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); + let mut args = vec![ + "--init", + "-D", + self.env.base_data_dir.to_str().unwrap(), + "--postgres-distrib", + self.env.pg_distrib_dir.to_str().unwrap(), + ]; + create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid])); let status = cmd - .args(&[ - "--init", - "-D", - self.env.base_data_dir.to_str().unwrap(), - "--postgres-distrib", - self.env.pg_distrib_dir.to_str().unwrap(), - ]) + .args(args) .env_clear() .env("RUST_BACKTRACE", "1") .status() @@ -148,9 +152,30 @@ impl PageServerNode { self.connection_config().connect(NoTls) } - pub fn branches_list(&self) -> Result> { + pub fn tenants_list(&self) -> Result> { let mut client = self.page_server_psql_client()?; - let query_result = client.simple_query("branch_list")?; + let query_result = client.simple_query("tenant_list")?; + let tenants_json = query_result + .first() + .map(|msg| match msg { + postgres::SimpleQueryMessage::Row(row) => row.get(0), + _ => None, + }) + .flatten() + .ok_or_else(|| anyhow!("missing tenants"))?; + + Ok(serde_json::from_str(tenants_json)?) + } + + pub fn tenant_create(&self, tenantid: &ZTenantId) -> Result<()> { + let mut client = self.page_server_psql_client()?; + client.simple_query(format!("tenant_create {}", tenantid).as_str())?; + Ok(()) + } + + pub fn branches_list(&self, tenantid: &ZTenantId) -> Result> { + let mut client = self.page_server_psql_client()?; + let query_result = client.simple_query(&format!("branch_list {}", tenantid))?; let branches_json = query_result .first() .map(|msg| match msg { @@ -160,14 +185,19 @@ impl PageServerNode { .flatten() .ok_or_else(|| anyhow!("missing branches"))?; - let res: Vec = serde_json::from_str(branches_json)?; - Ok(res) + Ok(serde_json::from_str(branches_json)?) } - pub fn branch_create(&self, name: &str, startpoint: &str) -> Result { + pub fn branch_create( + &self, + branch_name: &str, + startpoint: &str, + tenantid: &ZTenantId, + ) -> Result { let mut client = self.page_server_psql_client()?; - let query_result = - client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?; + let query_result = client.simple_query( + format!("branch_create {} {} {}", tenantid, branch_name, startpoint).as_str(), + )?; let branch_json = query_result .first() @@ -190,8 +220,12 @@ impl PageServerNode { } // TODO: make this a separate request type and avoid loading all the branches - pub fn branch_get_by_name(&self, name: &str) -> Result { - let branch_infos = self.branches_list()?; + pub fn branch_get_by_name( + &self, + tenantid: &ZTenantId, + branch_name: &str, + ) -> Result { + let branch_infos = self.branches_list(tenantid)?; let branche_by_name: Result> = branch_infos .into_iter() .map(|branch_info| Ok((branch_info.name.clone(), branch_info))) @@ -199,8 +233,8 @@ impl PageServerNode { let branche_by_name = branche_by_name?; let branch = branche_by_name - .get(name) - .ok_or_else(|| anyhow!("Branch {} not found", name))?; + .get(branch_name) + .ok_or_else(|| anyhow!("Branch {} not found", branch_name))?; Ok(branch.clone()) } diff --git a/docs/multitenancy.md b/docs/multitenancy.md new file mode 100644 index 0000000000..7016509375 --- /dev/null +++ b/docs/multitenancy.md @@ -0,0 +1,59 @@ +## Multitenancy + +### Overview + +Zenith supports multitenancy. One pageserver can serve multiple tenants at once. Tenants can be managed via zenith CLI. During page server setup tenant can be created using ```zenith init --create-tenant``` Also tenants can be added into the system on the fly without pageserver restart. This can be done using the following cli command: ```zenith tenant create``` Tenants use random identifiers which can be represented as a 32 symbols hexadecimal string. So zenith tenant create accepts desired tenant id as an optional argument. The concept of timelines/branches is working independently per tenant. + +### Tenants in other commands + +By default during `zenith init` new tenant is created on the pageserver. Newly created tenant's id is saved to cli config, so other commands can use it automatically if no direct arugment `--tenantid=` is provided. So generally tenantid more frequently appears in internal pageserver interface. Its commands take tenantid argument to distinguish to which tenant operation should be applied. CLI support creation of new tenants. + +Examples for cli: + +```sh +zenith tenant list + +zenith tenant create // generates new id + +zenith tenant create ee6016ec31116c1b7c33dfdfca38892f + +zenith pg create main // default tenant from zenith init + +zenith pg create main --tenantid=ee6016ec31116c1b7c33dfdfca38892f + +zenith branch --tenantid=ee6016ec31116c1b7c33dfdfca38892f +``` + +### Data layout + +On the page server tenants introduce one level of indirection, so data directory structured the following way: +``` + +├── pageserver.log +├── pageserver.pid +├── pageserver.toml +└── tenants + ├── 537cffa58a4fa557e49e19951b5a9d6b + ├── de182bc61fb11a5a6b390a8aed3a804a + └── ee6016ec31116c1b7c33dfdfca38891f +``` +Wal redo activity, timelines, snapshots are managed for each tenant independently. + +For local environment used for example in tests there also new level of indirection for tenants. It touches `pgdatadirs` directory. Now it contains `tenants` subdirectory so the structure looks the following way: + +``` +pgdatadirs +└── tenants + ├── de182bc61fb11a5a6b390a8aed3a804a + │ └── main + └── ee6016ec31116c1b7c33dfdfca38892f + └── main +``` + +### Changes to postgres + +Tenant id is passed to postgres via GUC the same way as the timeline. Tenant id is added to commands issued to pageserver, namely: pagestream, callmemaybe. Tenant id is also exists in ServerInfo structure, this is needed to pass the value to wal receiver to be able to forward it to the pageserver. + +### Safety + +For now particular tenant can only appear on a particular pageserver. Set of WAL acceptors are also pinned to particular (tenantid, timeline) pair so there can only be one writer for particular (tenantid, timeline). diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 8b5e759eed..1b6b939db4 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -8,10 +8,11 @@ //! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and //! data stored in object storage. //! -use crate::ZTimelineId; +use crate::{PageServerConf, ZTenantId, ZTimelineId}; use bytes::{BufMut, BytesMut}; use log::*; use std::io::Write; +use std::path::PathBuf; use std::sync::Arc; use std::time::SystemTime; use tar::{Builder, Header}; @@ -32,7 +33,7 @@ pub struct Basebackup<'a> { timeline: &'a Arc, lsn: Lsn, prev_record_lsn: Lsn, - snappath: String, + snappath: PathBuf, slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], slru_segno: u32, slru_path: &'static str, @@ -40,7 +41,9 @@ pub struct Basebackup<'a> { impl<'a> Basebackup<'a> { pub fn new( + conf: &PageServerConf, write: &'a mut dyn Write, + tenantid: ZTenantId, timelineid: ZTimelineId, timeline: &'a Arc, lsn: Lsn, @@ -52,7 +55,9 @@ impl<'a> Basebackup<'a> { timeline, lsn, prev_record_lsn, - snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0), + snappath: conf + .snapshots_path(&timelineid, &tenantid) + .join(format!("{:016X}", snapshot_lsn.0)), slru_path: "", slru_segno: u32::MAX, slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], @@ -60,7 +65,7 @@ impl<'a> Basebackup<'a> { } pub fn send_tarball(&mut self) -> anyhow::Result<()> { - debug!("sending tarball of snapshot in {}", self.snappath); + debug!("sending tarball of snapshot in {}", self.snappath.display()); for entry in WalkDir::new(&self.snappath) { let entry = entry?; let fullpath = entry.path(); @@ -179,7 +184,7 @@ impl<'a> Basebackup<'a> { } else { // User defined tablespaces are not supported assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); - let src_path = format!("{}/base/1/PG_VERSION", self.snappath); + let src_path = self.snappath.join("base/1/PG_VERSION"); let dst_path = format!("base/{}/PG_VERSION", db.dbnode); self.ar.append_path_with_name(&src_path, &dst_path)?; format!("base/{}/pg_filenode.map", db.dbnode) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4606f08bc6..31f96e366e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -6,18 +6,18 @@ use log::*; use serde::{Deserialize, Serialize}; use std::{ env, - net::{TcpListener}, + net::TcpListener, path::{Path, PathBuf}, process::exit, thread, time::Duration, }; -use anyhow::{Result}; +use anyhow::Result; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; -use pageserver::{branches, page_cache, page_service, tui, logger, PageServerConf}; +use pageserver::{branches, logger, page_cache, page_service, tui, PageServerConf}; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; @@ -158,6 +158,13 @@ fn main() -> Result<()> { .takes_value(true) .help("Postgres distribution directory"), ) + .arg( + Arg::with_name("create-tenant") + .long("create-tenant") + .takes_value(true) + .help("Create tenant during init") + .requires("init"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); @@ -166,6 +173,8 @@ fn main() -> Result<()> { let args_params = CfgFileParams::from_args(&arg_matches); let init = arg_matches.is_present("init"); + let create_tenant = arg_matches.value_of("create-tenant"); + let params = if init { // We're initializing the repo, so there's no config file yet args_params @@ -199,8 +208,7 @@ fn main() -> Result<()> { // Create repo and exit if init was requested if init { - branches::init_repo(conf, &workdir)?; - + branches::init_pageserver(conf, workdir, create_tenant)?; // write the config file let cfg_file_contents = toml::to_string_pretty(¶ms)?; std::fs::write(&cfg_file_path, cfg_file_contents)?; @@ -215,7 +223,6 @@ fn main() -> Result<()> { } fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { - // Initialize logger let (_scope_guard, log_file) = logger::init_logging(&conf, "pageserver.log")?; let _log_guard = slog_stdlog::init()?; diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index c0e26746b0..0a2218ae35 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -4,15 +4,15 @@ // TODO: move all paths construction to conf impl // -use anyhow::{bail, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use fs::File; use postgres_ffi::{pg_constants, xlog_utils, ControlFileData}; -use rand::Rng; use serde::{Deserialize, Serialize}; use std::env; -use std::io::{Read, Write}; +use std::io::Read; +use std::sync::Arc; use std::{ - fs, io, + fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr, @@ -22,8 +22,11 @@ use zenith_utils::lsn::Lsn; use log::*; use crate::logger; +use crate::object_repository::ObjectRepository; use crate::page_cache; use crate::restore_local_repo; +use crate::walredo::WalRedoManager; +use crate::ZTenantId; use crate::{repository::Repository, PageServerConf, ZTimelineId}; #[derive(Serialize, Deserialize, Clone)] @@ -41,35 +44,61 @@ pub struct PointInTime { pub lsn: Lsn, } -pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { - // top-level dir may exist if we are creating it through CLI - fs::create_dir_all(repo_dir) - .with_context(|| format!("could not create directory {}", repo_dir.display()))?; - - env::set_current_dir(repo_dir)?; - +pub fn init_pageserver( + conf: &'static PageServerConf, + workdir: &Path, + create_tenant: Option<&str>, +) -> Result<()> { // Initialize logger let (_scope_guard, _log_file) = logger::init_logging(&conf, "pageserver.log")?; let _log_guard = slog_stdlog::init()?; + env::set_current_dir(workdir)?; + if let Some(tenantid) = create_tenant { + let tenantid = ZTenantId::from_str(tenantid)?; + println!("initializing tenantid {}", tenantid); + create_repo( + conf, + tenantid, + Arc::new(crate::walredo::DummyRedoManager {}), + ) + .with_context(|| "failed to create repo")?; + } + fs::create_dir_all(conf.tenants_path())?; + println!("pageserver init succeeded"); + Ok(()) +} + +pub fn create_repo( + conf: &'static PageServerConf, + tenantid: ZTenantId, + wal_redo_manager: Arc, +) -> Result { + let repo_dir = conf.tenant_path(&tenantid); + if repo_dir.exists() { + bail!("repo for {} already exists", tenantid) + } + + // top-level dir may exist if we are creating it through CLI + fs::create_dir_all(&repo_dir) + .with_context(|| format!("could not create directory {}", repo_dir.display()))?; + // Note: this `info!(...)` macro comes from `log` crate info!("standard logging redirected to slog"); - fs::create_dir(std::path::Path::new("timelines"))?; - fs::create_dir(std::path::Path::new("refs"))?; - fs::create_dir(std::path::Path::new("refs").join("branches"))?; - fs::create_dir(std::path::Path::new("refs").join("tags"))?; + fs::create_dir(conf.timelines_path(&tenantid))?; + fs::create_dir_all(conf.branches_path(&tenantid))?; + fs::create_dir_all(conf.tags_path(&tenantid))?; - println!("created directory structure in {}", repo_dir.display()); + info!("created directory structure in {}", repo_dir.display()); // Run initdb // // We create the cluster temporarily in a "tmp" directory inside the repository, // and move it to the right location from there. - let tmppath = std::path::Path::new("tmp"); + let tmppath = conf.tenant_path(&tenantid).join("tmp"); - print!("running initdb... "); - io::stdout().flush()?; + info!("running initdb... "); let initdb_path = conf.pg_bin_dir().join("initdb"); let initdb_otput = Command::new(initdb_path) @@ -88,7 +117,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { String::from_utf8_lossy(&initdb_otput.stderr) ); } - println!("initdb succeeded"); + info!("initdb succeeded"); // Read control file to extract the LSN and system id let controlfile_path = tmppath.join("global").join("pg_control"); @@ -98,8 +127,8 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { let lsnstr = format!("{:016X}", lsn); // Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch. - let tli = create_timeline(conf, None)?; - let timelinedir = conf.timeline_path(tli); + let tli = create_timeline(conf, None, &tenantid)?; + let timelinedir = conf.timeline_path(&tli, &tenantid); // We don't use page_cache here, because we don't want to spawn the WAL redo thread during // repository initialization. @@ -110,12 +139,13 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // and we failed to run initdb again in the same directory. This has been solved for the // rapid init+start case now, but the general race condition remains if you restart the // server quickly. - let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?; + let storage = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?; let repo = crate::object_repository::ObjectRepository::new( conf, std::sync::Arc::new(storage), - std::sync::Arc::new(crate::walredo::DummyRedoManager {}), + wal_redo_manager, + tenantid, ); let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?; @@ -128,11 +158,11 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { .join("wal") .join("000000010000000000000001.partial"), )?; - println!("created initial timeline {}", tli); + info!("created initial timeline {}", tli); let data = tli.to_string(); - fs::write(conf.branch_path("main"), data)?; - println!("created main branch"); + fs::write(conf.branch_path("main", &tenantid), data)?; + info!("created main branch"); // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; @@ -143,20 +173,32 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; - println!( + info!( "new zenith repository was created in {}", repo_dir.display() ); - Ok(()) + Ok(repo) } -pub(crate) fn get_branches(conf: &PageServerConf) -> Result> { - let repo = page_cache::get_repository(); +pub(crate) fn get_tenants(conf: &PageServerConf) -> Result> { + let tenants_dir = conf.tenants_path(); + + std::fs::read_dir(&tenants_dir)? + .map(|dir_entry_res| { + let dir_entry = dir_entry_res?; + ensure!(dir_entry.file_type()?.is_dir()); + Ok(dir_entry.file_name().to_str().unwrap().to_owned()) + }) + .collect() +} + +pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Result> { + let repo = page_cache::get_repository_for_tenant(tenantid)?; // Each branch has a corresponding record (text file) in the refs/branches // with timeline_id. - let branches_dir = std::path::Path::new("refs").join("branches"); + let branches_dir = conf.branches_path(tenantid); std::fs::read_dir(&branches_dir)? .map(|dir_entry_res| { @@ -169,7 +211,7 @@ pub(crate) fn get_branches(conf: &PageServerConf) -> Result> { .map(|timeline| timeline.get_last_valid_lsn()) .ok(); - let ancestor_path = conf.ancestor_path(timeline_id); + let ancestor_path = conf.ancestor_path(&timeline_id, tenantid); let mut ancestor_id: Option = None; let mut ancestor_lsn: Option = None; @@ -206,14 +248,15 @@ pub(crate) fn create_branch( conf: &PageServerConf, branchname: &str, startpoint_str: &str, + tenantid: &ZTenantId, ) -> Result { - let repo = page_cache::get_repository(); + let repo = page_cache::get_repository_for_tenant(tenantid)?; - if conf.branch_path(&branchname).exists() { + if conf.branch_path(branchname, tenantid).exists() { anyhow::bail!("branch {} already exists", branchname); } - let mut startpoint = parse_point_in_time(conf, startpoint_str)?; + let mut startpoint = parse_point_in_time(conf, startpoint_str, tenantid)?; if startpoint.lsn == Lsn(0) { // Find end of WAL on the old timeline @@ -225,19 +268,20 @@ pub(crate) fn create_branch( } // create a new timeline directory for it - let newtli = create_timeline(conf, Some(startpoint))?; - let newtimelinedir = conf.timeline_path(newtli); + let newtli = create_timeline(conf, Some(startpoint), tenantid)?; + let newtimelinedir = conf.timeline_path(&newtli, tenantid); // Let the Repository backend do its initialization repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?; // Copy the latest snapshot (TODO: before the startpoint) and all WAL // TODO: be smarter and avoid the copying... - let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?; + let (_maxsnapshot, oldsnapshotdir) = + find_latest_snapshot(conf, &startpoint.timelineid, tenantid)?; let copy_opts = fs_extra::dir::CopyOptions::new(); fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - let oldtimelinedir = conf.timeline_path(startpoint.timelineid); + let oldtimelinedir = conf.timeline_path(&startpoint.timelineid, tenantid); copy_wal( &oldtimelinedir.join("wal"), &newtimelinedir.join("wal"), @@ -249,7 +293,7 @@ pub(crate) fn create_branch( // FIXME: there's a race condition, if you create a branch with the same // name concurrently. let data = newtli.to_string(); - fs::write(conf.branch_path(&branchname), data)?; + fs::write(conf.branch_path(&branchname, tenantid), data)?; Ok(BranchInfo { name: branchname.to_string(), @@ -279,7 +323,11 @@ pub(crate) fn create_branch( // mytag // // -fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { +fn parse_point_in_time( + conf: &PageServerConf, + s: &str, + tenantid: &ZTenantId, +) -> Result { let mut strings = s.split('@'); let name = strings.next().unwrap(); @@ -294,21 +342,21 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { // Check if it's a tag if lsn.is_none() { - let tagpath = conf.tag_path(name); + let tagpath = conf.tag_path(name, &tenantid); if tagpath.exists() { let pointstr = fs::read_to_string(tagpath)?; - return parse_point_in_time(conf, &pointstr); + return parse_point_in_time(conf, &pointstr, &tenantid); } } // Check if it's a branch // Check if it's branch @ LSN - let branchpath = conf.branch_path(name); + let branchpath = conf.branch_path(name, &tenantid); if branchpath.exists() { let pointstr = fs::read_to_string(branchpath)?; - let mut result = parse_point_in_time(conf, &pointstr)?; + let mut result = parse_point_in_time(conf, &pointstr, &tenantid)?; result.lsn = lsn.unwrap_or(Lsn(0)); return Ok(result); @@ -317,7 +365,7 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { // Check if it's a timelineid // Check if it's timelineid @ LSN if let Ok(timelineid) = ZTimelineId::from_str(name) { - let tlipath = conf.timeline_path(timelineid); + let tlipath = conf.timeline_path(&timelineid, &tenantid); if tlipath.exists() { return Ok(PointInTime { timelineid, @@ -329,13 +377,16 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } -fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { +fn create_timeline( + conf: &PageServerConf, + ancestor: Option, + tenantid: &ZTenantId, +) -> Result { // Create initial timeline - let mut tli_buf = [0u8; 16]; - rand::thread_rng().fill(&mut tli_buf); - let timelineid = ZTimelineId::from(tli_buf); - let timelinedir = conf.timeline_path(timelineid); + let timelineid = ZTimelineId::generate(); + + let timelinedir = conf.timeline_path(&timelineid, tenantid); fs::create_dir(&timelinedir)?; fs::create_dir(&timelinedir.join("snapshots"))?; @@ -397,8 +448,12 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: usize) -> R } // Find the latest snapshot for a timeline -fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> { - let snapshotsdir = conf.snapshots_path(timeline); +fn find_latest_snapshot( + conf: &PageServerConf, + timelineid: &ZTimelineId, + tenantid: &ZTenantId, +) -> Result<(Lsn, PathBuf)> { + let snapshotsdir = conf.snapshots_path(timelineid, tenantid); let paths = fs::read_dir(&snapshotsdir)?; let mut maxsnapshot = Lsn(0); let mut snapshotdir: Option = None; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8a3b89e02f..f9c3703d77 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,3 +1,5 @@ +use hex::FromHex; +use rand::Rng; use serde::{Deserialize, Serialize}; use std::fmt; @@ -48,24 +50,48 @@ impl PageServerConf { // Repository paths, relative to workdir. // - fn tag_path(&self, name: &str) -> PathBuf { - self.workdir.join("refs").join("tags").join(name) + fn tenants_path(&self) -> PathBuf { + self.workdir.join("tenants") } - fn branch_path(&self, name: &str) -> PathBuf { - self.workdir.join("refs").join("branches").join(name) + fn tenant_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenants_path().join(tenantid.to_string()) } - fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.workdir.join("timelines").join(timelineid.to_string()) + fn tags_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("refs").join("tags") } - fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.timeline_path(timelineid).join("snapshots") + fn tag_path(&self, tag_name: &str, tenantid: &ZTenantId) -> PathBuf { + self.tags_path(tenantid).join(tag_name) } - fn ancestor_path(&self, timelineid: ZTimelineId) -> PathBuf { - self.timeline_path(timelineid).join("ancestor") + fn branches_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("refs").join("branches") + } + + fn branch_path(&self, branch_name: &str, tenantid: &ZTenantId) -> PathBuf { + self.branches_path(tenantid).join(branch_name) + } + + fn timelines_path(&self, tenantid: &ZTenantId) -> PathBuf { + self.tenant_path(tenantid).join("timelines") + } + + fn timeline_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timelines_path(tenantid).join(timelineid.to_string()) + } + + fn snapshots_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("snapshots") + } + + fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("ancestor") + } + + fn wal_dir_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { + self.timeline_path(timelineid, tenantid).join("wal") } // @@ -81,8 +107,67 @@ impl PageServerConf { } } -/// Zenith Timeline ID is a 128-bit random ID. -/// +// Zenith ID is a 128-bit random ID. +// Used to represent various identifiers. Provides handy utility methods and impls. +// TODO (LizardWizzard) figure out best way to remove boiler plate with trait impls caused by newtype pattern +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +struct ZId([u8; 16]); + +impl ZId { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId { + let mut arr = [0u8; 16]; + buf.copy_to_slice(&mut arr); + ZId::from(arr) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0 + } + + pub fn generate() -> Self { + let mut tli_buf = [0u8; 16]; + rand::thread_rng().fill(&mut tli_buf); + ZId::from(tli_buf) + } +} + +impl FromStr for ZId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + Self::from_hex(s) + } +} + +// this is needed for pretty serialization and deserialization of ZId's using serde integration with hex crate +impl FromHex for ZId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + let mut buf: [u8; 16] = [0u8; 16]; + hex::decode_to_slice(hex, &mut buf)?; + Ok(ZId(buf)) + } +} + +impl AsRef<[u8]> for ZId { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl From<[u8; 16]> for ZId { + fn from(b: [u8; 16]) -> Self { + ZId(b) + } +} + +impl fmt::Display for ZId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + /// Zenith timeline IDs are different from PostgreSQL timeline /// IDs. They serve a similar purpose though: they differentiate /// between different "histories" of the same cluster. However, @@ -106,38 +191,93 @@ impl PageServerConf { /// limitations. A zenith timeline is identified by a 128-bit ID, which /// is usually printed out as a hex string. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ZTimelineId([u8; 16]); +pub struct ZTimelineId(ZId); impl FromStr for ZTimelineId { type Err = hex::FromHexError; fn from_str(s: &str) -> Result { - let timelineid = hex::decode(s)?; + let value = ZId::from_str(s)?; + Ok(ZTimelineId(value)) + } +} - let mut buf: [u8; 16] = [0u8; 16]; - buf.copy_from_slice(timelineid.as_slice()); - Ok(ZTimelineId(buf)) +impl From<[u8; 16]> for ZTimelineId { + fn from(b: [u8; 16]) -> Self { + ZTimelineId(ZId::from(b)) } } impl ZTimelineId { - pub fn from(b: [u8; 16]) -> ZTimelineId { - ZTimelineId(b) - } - pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId { - let mut arr = [0u8; 16]; - buf.copy_to_slice(&mut arr); - ZTimelineId::from(arr) + ZTimelineId(ZId::get_from_buf(buf)) } pub fn as_arr(&self) -> [u8; 16] { - self.0 + self.0.as_arr() + } + + pub fn generate() -> Self { + ZTimelineId(ZId::generate()) } } impl fmt::Display for ZTimelineId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) + self.0.fmt(f) + } +} + +// Zenith Tenant Id represents identifiar of a particular tenant. +// Is used for distinguishing requests and data belonging to different users. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +pub struct ZTenantId(ZId); + +impl FromStr for ZTenantId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + let value = ZId::from_str(s)?; + Ok(ZTenantId(value)) + } +} + +impl From<[u8; 16]> for ZTenantId { + fn from(b: [u8; 16]) -> Self { + ZTenantId(ZId::from(b)) + } +} + +impl FromHex for ZTenantId { + type Error = hex::FromHexError; + + fn from_hex>(hex: T) -> Result { + Ok(ZTenantId(ZId::from_hex(hex)?)) + } +} + +impl AsRef<[u8]> for ZTenantId { + fn as_ref(&self) -> &[u8] { + &self.0 .0 + } +} + +impl ZTenantId { + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTenantId { + ZTenantId(ZId::get_from_buf(buf)) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0.as_arr() + } + + pub fn generate() -> Self { + ZTenantId(ZId::generate()) + } +} + +impl fmt::Display for ZTenantId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) } } diff --git a/pageserver/src/logger.rs b/pageserver/src/logger.rs index fe3ed381f2..b1a98bbb7e 100644 --- a/pageserver/src/logger.rs +++ b/pageserver/src/logger.rs @@ -1,14 +1,13 @@ use crate::{tui, PageServerConf}; -use std::fs::{OpenOptions, File}; use anyhow::{Context, Result}; use slog::{Drain, FnValue}; +use std::fs::{File, OpenOptions}; pub fn init_logging( conf: &PageServerConf, - log_filename: &str + log_filename: &str, ) -> Result<(slog_scope::GlobalLoggerGuard, File)> { - // Don't open the same file for output multiple times; // the different fds could overwrite each other's output. let log_file = OpenOptions::new() diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 366055ce6d..fbc607f24b 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -13,11 +13,11 @@ //! until we find the page we're looking for, making a separate lookup into the //! key-value store for each timeline. -use crate::object_key::*; use crate::object_store::ObjectStore; use crate::repository::*; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; +use crate::{object_key::*, ZTenantId}; use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; @@ -42,6 +42,7 @@ pub struct ObjectRepository { conf: &'static PageServerConf, timelines: Mutex>>, walredo_mgr: Arc, + tenantid: ZTenantId, } // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -52,12 +53,14 @@ impl ObjectRepository { conf: &'static PageServerConf, obj_store: Arc, walredo_mgr: Arc, + tenantid: ZTenantId, ) -> ObjectRepository { ObjectRepository { conf, obj_store, timelines: Mutex::new(HashMap::new()), walredo_mgr, + tenantid, } } } @@ -82,7 +85,7 @@ impl Repository for ObjectRepository { timelineid, timeline.get_last_record_lsn() ); - let wal_dir = self.conf.timeline_path(timelineid).join("wal"); + let wal_dir = self.conf.wal_dir_path(&timelineid, &self.tenantid); import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; let timeline_rc = Arc::new(timeline); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index add3e3ba7f..4e49cbef3e 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -1,35 +1,51 @@ //! This module acts as a switchboard to access different repositories managed by this -//! page server. Currently, a Page Server can only manage one repository, so there -//! isn't much here. If we implement multi-tenancy, this will probably be changed into -//! a hash map, keyed by the tenant ID. +//! page server. use crate::object_repository::ObjectRepository; use crate::repository::Repository; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; -use crate::PageServerConf; +use crate::{PageServerConf, ZTenantId}; +use anyhow::{anyhow, Result}; use lazy_static::lazy_static; +use log::info; +use std::collections::HashMap; +use std::fs; +use std::str::FromStr; use std::sync::{Arc, Mutex}; lazy_static! { - pub static ref REPOSITORY: Mutex>> = Mutex::new(None); + pub static ref REPOSITORY: Mutex>> = + Mutex::new(HashMap::new()); } pub fn init(conf: &'static PageServerConf) { let mut m = REPOSITORY.lock().unwrap(); - let obj_store = RocksObjectStore::open(conf).unwrap(); + for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() { + let tenantid = + ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap(); + let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap(); - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf); + // Set up a WAL redo manager, for applying WAL records. + let walredo_mgr = PostgresRedoManager::new(conf, tenantid); - // we have already changed current dir to the repository. - let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); - - *m = Some(Arc::new(repo)); + // Set up an object repository, for actual data storage. + let repo = + ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); + info!("initialized storage for tenant: {}", &tenantid); + m.insert(tenantid, Arc::new(repo)); + } } -pub fn get_repository() -> Arc { +pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc) { + let o = &mut REPOSITORY.lock().unwrap(); + o.insert(tenantid, repo); +} + +pub fn get_repository_for_tenant(tenantid: &ZTenantId) -> Result> { let o = &REPOSITORY.lock().unwrap(); - Arc::clone(o.as_ref().unwrap()) + o.get(tenantid) + .map(|repo| Arc::clone(repo)) + .ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid)) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8ff742fbfc..01c9c8e4b1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,13 +10,14 @@ // *callmemaybe $url* -- ask pageserver to start walreceiver on $url // -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, ensure}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use regex::Regex; use std::io::Write; use std::net::TcpListener; use std::str::FromStr; +use std::sync::Arc; use std::thread; use std::{io, net::TcpStream}; use zenith_utils::postgres_backend::PostgresBackend; @@ -33,7 +34,9 @@ use crate::page_cache; use crate::repository::{BufferTag, Modification, RelTag}; use crate::restore_local_repo; use crate::walreceiver; +use crate::walredo::PostgresRedoManager; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; // Wrapped in libpq CopyData @@ -180,9 +183,10 @@ impl PageServerHandler { &self, pgb: &mut PostgresBackend, timelineid: ZTimelineId, + tenantid: ZTenantId, ) -> anyhow::Result<()> { // Check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; let timeline = repository.get_timeline(timelineid).map_err(|_| { anyhow!( "client requested pagestream on timeline {} which does not exist in page server", @@ -274,9 +278,10 @@ impl PageServerHandler { pgb: &mut PostgresBackend, timelineid: ZTimelineId, lsn: Option, + tenantid: ZTenantId, ) -> anyhow::Result<()> { // check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; let timeline = repository.get_timeline(timelineid).map_err(|e| { error!("error fetching timeline: {:?}", e); anyhow!( @@ -292,14 +297,16 @@ impl PageServerHandler { // find latest snapshot let snapshot_lsn = - restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); + restore_local_repo::find_latest_snapshot(&self.conf, &timelineid, &tenantid).unwrap(); let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); { let mut writer = CopyDataSink { pgb }; let mut basebackup = basebackup::Basebackup::new( + self.conf, &mut writer, + tenantid, timelineid, &timeline, req_lsn, @@ -328,84 +335,102 @@ impl postgres_backend::Handler for PageServerHandler { if query_string.last() == Some(&0) { query_string.truncate(query_string.len() - 1); } + let query_string = std::str::from_utf8(&query_string)?; - if query_string.starts_with(b"controlfile") { + if query_string.starts_with("controlfile") { self.handle_controlfile(pgb)?; - } else if query_string.starts_with(b"pagestream ") { - let (_l, r) = query_string.split_at("pagestream ".len()); - let timelineid_str = String::from_utf8(r.to_vec())?; - let timelineid = ZTimelineId::from_str(&timelineid_str)?; + } else if query_string.starts_with("pagestream ") { + let (_, params_raw) = query_string.split_at("pagestream ".len()); + let params = params_raw.split(" ").collect::>(); + ensure!( + params.len() == 2, + "invalid param number for pagestream command" + ); + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; - self.handle_pagerequests(pgb, timelineid)?; - } else if query_string.starts_with(b"basebackup ") { - let (_l, r) = query_string.split_at("basebackup ".len()); - let r = r.to_vec(); - let basebackup_args = String::from(String::from_utf8(r)?.trim_end()); - let args: Vec<&str> = basebackup_args.rsplit(' ').collect(); - let timelineid_str = args[0]; - info!("got basebackup command: \"{}\"", timelineid_str); - let timelineid = ZTimelineId::from_str(&timelineid_str)?; - let lsn = if args.len() > 1 { - Some(Lsn::from_str(args[1])?) + self.handle_pagerequests(pgb, timelineid, tenantid)?; + } else if query_string.starts_with("basebackup ") { + let (_, params_raw) = query_string.split_at("basebackup ".len()); + let params = params_raw.split(" ").collect::>(); + ensure!( + params.len() == 2, + "invalid param number for basebackup command" + ); + + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; + + // TODO are there any tests with lsn option? + let lsn = if params.len() == 3 { + Some(Lsn::from_str(params[2])?) } else { None }; + info!( + "got basebackup command. tenantid=\"{}\" timelineid=\"{}\" lsn=\"{:#?}\"", + tenantid, timelineid, lsn + ); + // Check that the timeline exists - self.handle_basebackup_request(pgb, timelineid, lsn)?; + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"callmemaybe ") { - let query_str = String::from_utf8(query_string.to_vec())?; - - // callmemaybe + } else if query_string.starts_with("callmemaybe ") { + // callmemaybe // TODO lazy static - let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) (.*)$").unwrap(); + let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); let caps = re - .captures(&query_str) - .ok_or_else(|| anyhow!("invalid callmemaybe: '{}'", query_str))?; + .captures(query_string) + .ok_or_else(|| anyhow!("invalid callmemaybe: '{}'", query_string))?; - let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str())?; - let connstr: String = String::from(caps.get(2).unwrap().as_str()); + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let connstr = caps.get(3).unwrap().as_str().to_owned(); // Check that the timeline exists - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(&tenantid)?; if repository.get_timeline(timelineid).is_err() { bail!("client requested callmemaybe on timeline {} which does not exist in page server", timelineid); } - walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr); + walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr, tenantid.to_owned()); pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"branch_create ") { - let query_str = String::from_utf8(query_string.to_vec())?; - let err = || anyhow!("invalid branch_create: '{}'", query_str); + } else if query_string.starts_with("branch_create ") { + let err = || anyhow!("invalid branch_create: '{}'", query_string); - // branch_create + // branch_create // TODO lazy static // TOOD: escaping, to allow branch names with spaces - let re = Regex::new(r"^branch_create (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$").unwrap(); - let caps = re.captures(&query_str).ok_or_else(err)?; + let re = Regex::new(r"^branch_create ([[:xdigit:]]+) (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$") + .unwrap(); + let caps = re.captures(&query_string).ok_or_else(err)?; - let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str()); - let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str()); + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned(); + let startpoint_str = caps.get(3).ok_or_else(err)?.as_str().to_owned(); - let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?; + let branch = + branches::create_branch(&self.conf, &branchname, &startpoint_str, &tenantid)?; let branch = serde_json::to_vec(&branch)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::DataRow(&[Some(&branch)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"push ") { - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; + } else if query_string.starts_with("push ") { + // push + let re = Regex::new(r"^push ([[:xdigit:]]+) ([[:xdigit:]]+)$").unwrap(); + + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid push: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let start_lsn = Lsn(0); // TODO this needs to come from the repo - let timeline = - page_cache::get_repository().create_empty_timeline(timeline_id, start_lsn)?; + let timeline = page_cache::get_repository_for_tenant(&tenantid)? + .create_empty_timeline(timelineid, start_lsn)?; pgb.write_message(&BeMessage::CopyInResponse)?; @@ -433,22 +458,23 @@ impl postgres_backend::Handler for PageServerHandler { } pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"request_push ") { - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); + } else if query_string.starts_with("request_push ") { + // request_push + let re = Regex::new(r"^request_push ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; - let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid request_push: '{}'", query_string))?; - let postgres_connection_uri = - it.next().ok_or_else(|| anyhow!("missing postgres uri"))?; + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let postgres_connection_uri = caps.get(3).unwrap().as_str(); + + let timeline = + page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; let mut conn = postgres::Client::connect(postgres_connection_uri, postgres::NoTls)?; - let mut copy_in = conn.copy_in(format!("push {}", timeline_id.to_string()).as_str())?; + let mut copy_in = conn.copy_in(format!("push {}", timelineid.to_string()).as_str())?; let history = timeline.history()?; for update_res in history { @@ -461,44 +487,76 @@ impl postgres_backend::Handler for PageServerHandler { copy_in.finish()?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"branch_list") { - let branches = crate::branches::get_branches(&self.conf)?; + } else if query_string.starts_with("branch_list ") { + // branch_list + let re = Regex::new(r"^branch_list ([[:xdigit:]]+)$").unwrap(); + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid branch_list: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + + let branches = crate::branches::get_branches(&self.conf, &tenantid)?; let branches_buf = serde_json::to_vec(&branches)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::DataRow(&[Some(&branches_buf)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"status") { + } else if query_string.starts_with("tenant_list") { + let tenants = crate::branches::get_tenants(&self.conf)?; + let tenants_buf = serde_json::to_vec(&tenants)?; + + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? + .write_message_noflush(&BeMessage::DataRow(&[Some(&tenants_buf)]))? + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("tenant_create") { + let err = || anyhow!("invalid tenant_create: '{}'", query_string); + + // tenant_create + let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap(); + let caps = re.captures(&query_string).ok_or_else(err)?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let wal_redo_manager = Arc::new(PostgresRedoManager::new(self.conf, tenantid)); + let repo = branches::create_repo(self.conf, tenantid, wal_redo_manager)?; + page_cache::insert_repository_for_tenant(tenantid, Arc::new(repo)); + + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("status") { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&HELLO_WORLD_ROW)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.to_ascii_lowercase().starts_with(b"set ") { + } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with(b"do_gc ") { + } else if query_string.starts_with("do_gc ") { // Run GC immediately on given timeline. // FIXME: This is just for tests. See test_runner/batch_others/test_gc.py. // This probably should require special authentication or a global flag to // enable, I don't think we want to or need to allow regular clients to invoke // GC. - let query_str = std::str::from_utf8(&query_string)?; - let mut it = query_str.split(' '); - it.next().unwrap(); + // do_gc + let re = Regex::new(r"^do_gc ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)([[:digit:]]+)?") + .unwrap(); - let timeline_id: ZTimelineId = it - .next() - .ok_or_else(|| anyhow!("missing timeline id"))? - .parse()?; - let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + let caps = re + .captures(query_string) + .ok_or_else(|| anyhow!("invalid do_gc: '{}'", query_string))?; - let horizon: u64 = it - .next() - .unwrap_or(&self.conf.gc_horizon.to_string()) - .parse()?; + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let gc_horizon: u64 = caps + .get(4) + .map(|h| h.as_str().parse()) + .unwrap_or(Ok(self.conf.gc_horizon))?; - let result = timeline.gc_iteration(horizon, true)?; + let timeline = + page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; + + let result = timeline.gc_iteration(gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c4b507c0f2..ffa7cf4fe6 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -31,6 +31,7 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + // TODO get timelines? //fn get_stats(&self) -> RepositoryStats; } @@ -144,13 +145,13 @@ pub trait Timeline: Send + Sync { /// but it can be explicitly requested through page server API. /// /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). - /// `compact` parameter is used to force compaction of storage. - /// Some storage implementation are based on LSM tree and require periodic merge (compaction). - /// Usually storage implementation determines itself when compaction should be performed. - /// But for GC tests it way be useful to force compaction just after completion of GC iteration - /// to make sure that all detected garbage is removed. - /// So right now `compact` is set to true when GC explicitly requested through page srver API, - /// and is st to false in GC threads which infinitely repeats GC iterations in loop. + /// `compact` parameter is used to force compaction of storage. + /// Some storage implementation are based on LSM tree and require periodic merge (compaction). + /// Usually storage implementation determines itself when compaction should be performed. + /// But for GC tests it way be useful to force compaction just after completion of GC iteration + /// to make sure that all detected garbage is removed. + /// So right now `compact` is set to true when GC explicitly requested through page srver API, + /// and is st to false in GC threads which infinitely repeats GC iterations in loop. fn gc_iteration(&self, horizon: u64, compact: bool) -> Result; // Check transaction status @@ -311,7 +312,7 @@ mod tests { use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; - use crate::PageServerConf; + use crate::{PageServerConf, ZTenantId}; use postgres_ffi::pg_constants; use std::fs; use std::path::PathBuf; @@ -356,7 +357,7 @@ mod tests { fn get_test_repo(test_name: &str) -> Result> { let repo_dir = PathBuf::from(format!("../tmp_check/test_{}", test_name)); let _ = fs::remove_dir_all(&repo_dir); - fs::create_dir_all(&repo_dir)?; + fs::create_dir_all(&repo_dir).unwrap(); let conf = PageServerConf { daemonize: false, @@ -371,12 +372,15 @@ mod tests { // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + let tenantid = ZTenantId::generate(); + fs::create_dir_all(conf.tenant_path(&tenantid)).unwrap(); - let obj_store = RocksObjectStore::create(conf)?; + let obj_store = RocksObjectStore::create(conf, &tenantid)?; let walredo_mgr = TestRedoManager {}; - let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); + let repo = + ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); Ok(Box::new(repo)) } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index ad5e7cb298..61e839d5db 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -18,6 +18,7 @@ use crate::object_key::*; use crate::repository::*; use crate::waldecoder::*; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; @@ -30,9 +31,12 @@ const MAX_MBR_BLKNO: u32 = /// /// Find latest snapshot in a timeline's 'snapshots' directory /// -pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Result { - let snapshotspath = format!("timelines/{}/snapshots", timeline); - +pub fn find_latest_snapshot( + conf: &PageServerConf, + timelineid: &ZTimelineId, + tenantid: &ZTenantId, +) -> Result { + let snapshotspath = conf.snapshots_path(timelineid, tenantid); let mut last_snapshot_lsn = Lsn(0); for direntry in fs::read_dir(&snapshotspath).unwrap() { let filename = direntry.unwrap().file_name(); @@ -45,7 +49,10 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re } if last_snapshot_lsn == Lsn(0) { - error!("could not find valid snapshot in {}", &snapshotspath); + error!( + "could not find valid snapshot in {}", + snapshotspath.display() + ); // TODO return error? } Ok(last_snapshot_lsn) diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index 3164def5a9..f13b478d05 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -5,6 +5,7 @@ use crate::object_key::*; use crate::object_store::ObjectStore; use crate::repository::RelTag; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -241,26 +242,30 @@ impl ObjectStore for RocksObjectStore { impl RocksObjectStore { /// Open a RocksDB database. - pub fn open(conf: &'static PageServerConf) -> Result { + pub fn open(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { let opts = Self::get_rocksdb_opts(); - let obj_store = Self::new(conf, opts)?; + let obj_store = Self::new(conf, opts, tenantid)?; Ok(obj_store) } /// Create a new, empty RocksDB database. - pub fn create(conf: &'static PageServerConf) -> Result { - let path = conf.workdir.join("rocksdb-storage"); + pub fn create(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { + let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); std::fs::create_dir(&path)?; let mut opts = Self::get_rocksdb_opts(); opts.create_if_missing(true); opts.set_error_if_exists(true); - let obj_store = Self::new(conf, opts)?; + let obj_store = Self::new(conf, opts, tenantid)?; Ok(obj_store) } - fn new(conf: &'static PageServerConf, mut opts: rocksdb::Options) -> Result { - let path = conf.workdir.join("rocksdb-storage"); + fn new( + conf: &'static PageServerConf, + mut opts: rocksdb::Options, + tenantid: &ZTenantId, + ) -> Result { + let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); let gc = Arc::new(GarbageCollector::new()); let gc_ref = gc.clone(); opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], _val: &[u8]| { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 52c5f0e470..abd3218e86 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -9,6 +9,7 @@ use crate::page_cache; use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; +use crate::ZTenantId; use crate::ZTimelineId; use anyhow::{Error, Result}; use lazy_static::lazy_static; @@ -25,7 +26,6 @@ use std::collections::HashMap; use std::fs; use std::fs::{File, OpenOptions}; use std::io::{Seek, SeekFrom, Write}; -use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; use std::thread; @@ -50,6 +50,7 @@ pub fn launch_wal_receiver( conf: &'static PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, + tenantid: ZTenantId, ) { let mut receivers = WAL_RECEIVERS.lock().unwrap(); @@ -67,7 +68,7 @@ pub fn launch_wal_receiver( let _walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { - thread_main(conf, timelineid); + thread_main(conf, timelineid, &tenantid); }) .unwrap(); } @@ -88,7 +89,7 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { // // This is the entry point for the WAL receiver thread. // -fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { +fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: &ZTenantId) { info!( "WAL receiver thread started for timeline : '{}'", timelineid @@ -102,7 +103,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { // Look up the current WAL producer address let wal_producer_connstr = get_wal_producer_connstr(timelineid); - let res = walreceiver_main(conf, timelineid, &wal_producer_connstr); + let res = walreceiver_main(conf, timelineid, &wal_producer_connstr, tenantid); if let Err(e) = res { info!( @@ -115,9 +116,10 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) { } fn walreceiver_main( - _conf: &PageServerConf, + conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, + tenantid: &ZTenantId, ) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); @@ -134,7 +136,7 @@ fn walreceiver_main( let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; - let repository = page_cache::get_repository(); + let repository = page_cache::get_repository_for_tenant(tenantid)?; let timeline = repository.get_timeline(timelineid).unwrap(); // @@ -183,7 +185,14 @@ fn walreceiver_main( let endlsn = startlsn + data.len() as u64; let prev_last_rec_lsn = last_rec_lsn; - write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?; + write_wal_file( + conf, + startlsn, + &timelineid, + pg_constants::WAL_SEGMENT_SIZE, + data, + tenantid, + )?; trace!("received XLogData between {} and {}", startlsn, endlsn); @@ -237,9 +246,11 @@ fn walreceiver_main( { info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn); let (oldest_segno, newest_segno) = find_wal_file_range( - timelineid, + conf, + &timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn, + tenantid, )?; if newest_segno - oldest_segno >= 10 { @@ -296,16 +307,18 @@ fn walreceiver_main( } fn find_wal_file_range( - timeline: ZTimelineId, + conf: &PageServerConf, + timeline: &ZTimelineId, wal_seg_size: usize, written_upto: Lsn, + tenant: &ZTenantId, ) -> Result<(u64, u64)> { let written_upto_segno = written_upto.segment_number(wal_seg_size); let mut oldest_segno = written_upto_segno; let mut newest_segno = written_upto_segno; // Scan the wal directory, and count how many WAL filed we could remove - let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); + let wal_dir = conf.wal_dir_path(timeline, tenant); for entry in fs::read_dir(wal_dir)? { let entry = entry?; let path = entry.path(); @@ -382,10 +395,12 @@ pub fn identify_system(client: &mut Client) -> Result { } fn write_wal_file( + conf: &PageServerConf, startpos: Lsn, - timeline: ZTimelineId, + timelineid: &ZTimelineId, wal_seg_size: usize, buf: &[u8], + tenantid: &ZTenantId, ) -> anyhow::Result<()> { let mut bytes_left: usize = buf.len(); let mut bytes_written: usize = 0; @@ -393,7 +408,7 @@ fn write_wal_file( let mut start_pos = startpos; const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); + let wal_dir = conf.wal_dir_path(timelineid, tenantid); /* Extract WAL location for this block */ let mut xlogoff = start_pos.segment_offset(wal_seg_size); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e43b9c354a..102c8eb347 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -42,6 +42,7 @@ use crate::repository::WALRecord; use crate::waldecoder::XlXactParsedRecord; use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; +use crate::ZTenantId; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; @@ -100,6 +101,8 @@ struct PostgresRedoManagerInternal { conf: &'static PageServerConf, request_rx: mpsc::Receiver, + + tenantid: ZTenantId, } #[derive(Debug)] @@ -131,7 +134,7 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// /// This launches a new thread to handle the requests. - pub fn new(conf: &'static PageServerConf) -> PostgresRedoManager { + pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { let (tx, rx) = mpsc::channel(); // @@ -146,7 +149,11 @@ impl PostgresRedoManager { let _walredo_thread = std::thread::Builder::new() .name("WAL redo thread".into()) .spawn(move || { - let mut internal = PostgresRedoManagerInternal { conf, request_rx }; + let mut internal = PostgresRedoManagerInternal { + conf, + request_rx, + tenantid, + }; internal.wal_redo_main(); }) .unwrap(); @@ -219,7 +226,7 @@ impl PostgresRedoManagerInternal { // Main entry point for the WAL applicator thread. // fn wal_redo_main(&mut self) { - info!("WAL redo thread started"); + info!("WAL redo thread started for tenant: {}", self.tenantid); // We block on waiting for requests on the walredo request channel, but // use async I/O to communicate with the child process. Initialize the @@ -231,10 +238,13 @@ impl PostgresRedoManagerInternal { let process: PostgresRedoProcess; - info!("launching WAL redo postgres process"); + info!( + "launching WAL redo postgres process for tenant: {}", + self.tenantid + ); process = runtime - .block_on(PostgresRedoProcess::launch(self.conf)) + .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid)) .unwrap(); // Loop forever, handling requests as they come. @@ -454,11 +464,14 @@ impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch(conf: &PageServerConf) -> Result { + async fn launch( + conf: &PageServerConf, + tenantid: &ZTenantId, + ) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. - let datadir = conf.workdir.join("wal-redo-datadir"); + let datadir = conf.tenant_path(&tenantid).join("wal-redo-datadir"); // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index bcb66cb1f3..6b9e91a258 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -1,10 +1,13 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + + pytest_plugins = ("fixtures.zenith_fixtures") # # Create a couple of branches off the main branch, at a historical point in time. # -def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin): +def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): # Branch at the point where only 100 rows were inserted zenith_cli.run(["branch", "test_branch_behind", "empty"]) diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index 00c27b1ac1..d8cc798839 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -1,12 +1,14 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") # # Test starting Postgres with custom options # -def test_config(zenith_cli, pageserver, postgres, pg_bin): +def test_config(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): # Create a branch for us zenith_cli.run(["branch", "test_config", "empty"]) diff --git a/test_runner/batch_others/test_createdb.py b/test_runner/batch_others/test_createdb.py index bfc2224e05..7e582ccb3d 100644 --- a/test_runner/batch_others/test_createdb.py +++ b/test_runner/batch_others/test_createdb.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, ZenithCli pytest_plugins = ("fixtures.zenith_fixtures") @@ -6,7 +7,12 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE DATABASE when there have been relmapper changes # -def test_createdb(zenith_cli, pageserver, postgres, pg_bin): +def test_createdb( + zenith_cli: ZenithCli, + pageserver: ZenithPageserver, + postgres: PostgresFactory, + pg_bin, +): zenith_cli.run(["branch", "test_createdb", "empty"]) pg = postgres.create_start('test_createdb') diff --git a/test_runner/batch_others/test_createuser.py b/test_runner/batch_others/test_createuser.py index b821a233d1..f44df91c3c 100644 --- a/test_runner/batch_others/test_createuser.py +++ b/test_runner/batch_others/test_createuser.py @@ -1,12 +1,14 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE USER to check shared catalog restore # -def test_createuser(zenith_cli, pageserver, postgres, pg_bin): +def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_createuser", "empty"]) pg = postgres.create_start('test_createuser') diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index 116e628c8f..12076a8418 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver import psycopg2.extras pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,7 +10,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # This test is pretty tightly coupled with the current implementation of page version storage # and garbage collection in object_repository.rs. # -def test_gc(zenith_cli, pageserver, postgres, pg_bin): +def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_gc", "empty"]) pg = postgres.create_start('test_gc') @@ -30,7 +31,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): # before running the actual tests below, otherwise the counts won't match # what we expect. print("Running GC before test") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) # remember the number of relations @@ -42,7 +43,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): n_relations += 1; print("Inserting one row and running GC") cur.execute("INSERT INTO foo VALUES (1)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -55,7 +56,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES (2)") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -68,7 +69,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("Inserting one more row") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -77,7 +78,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): assert row['deleted'] == 1 # Run GC again, with no changes in the database. Should not remove anything. - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations @@ -90,7 +91,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): # cur.execute("DROP TABLE foo") - pscur.execute(f"do_gc {timeline} 0") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") row = pscur.fetchone() print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) # Each relation fork is counted separately, hence 3. diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index f64be25bcf..49c6e4aa66 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -1,3 +1,5 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + pytest_plugins = ("fixtures.zenith_fixtures") @@ -7,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # it only checks next_multixact_id field in restored pg_control, # since we don't have functions to check multixact internals. # -def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir): +def test_multixact(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, base_dir): # Create a branch for us zenith_cli.run(["branch", "test_multixact", "empty"]) pg = postgres.create_start('test_multixact') diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index e48c3b105d..4bc0c36a08 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -1,4 +1,8 @@ import json +import uuid +import pytest +import psycopg2 +from fixtures.zenith_fixtures import ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,14 +13,14 @@ def test_status(pageserver): ] -def test_branch_list(pageserver, zenith_cli): +def test_branch_list(pageserver: ZenithPageserver, zenith_cli): # Create a branch for us zenith_cli.run(["branch", "test_branch_list_main", "empty"]) conn = pageserver.connect() cur = conn.cursor() - cur.execute('branch_list') + cur.execute(f'branch_list {pageserver.initial_tenant}') branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests branches = [x for x in branches if x['name'].startswith('test_branch_list')] @@ -32,7 +36,7 @@ def test_branch_list(pageserver, zenith_cli): zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main']) zenith_cli.run(['pg', 'create', 'test_branch_list_experimental']) - cur.execute('branch_list') + cur.execute(f'branch_list {pageserver.initial_tenant}') new_branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')] @@ -46,3 +50,27 @@ def test_branch_list(pageserver, zenith_cli): assert new_branches[1] == branches[0] conn.close() + + +def test_tenant_list(pageserver: ZenithPageserver, zenith_cli): + res = zenith_cli.run(["tenant", "list"]) + res.check_returncode() + tenants = res.stdout.splitlines() + assert tenants == [pageserver.initial_tenant] + + conn = pageserver.connect() + cur = conn.cursor() + + # check same tenant cannot be created twice + with pytest.raises(psycopg2.DatabaseError, match=f'repo for {pageserver.initial_tenant} already exists'): + cur.execute(f'tenant_create {pageserver.initial_tenant}') + + # create one more tenant + tenant1 = uuid.uuid4().hex + cur.execute(f'tenant_create {tenant1}') + + cur.execute('tenant_list') + + # compare tenants list + new_tenants = sorted(json.loads(cur.fetchone()[0])) + assert sorted([pageserver.initial_tenant, tenant1]) == new_tenants diff --git a/test_runner/batch_others/test_pgbench.py b/test_runner/batch_others/test_pgbench.py index 486dd77496..a5423cf3d7 100644 --- a/test_runner/batch_others/test_pgbench.py +++ b/test_runner/batch_others/test_pgbench.py @@ -1,8 +1,9 @@ +from fixtures.zenith_fixtures import PostgresFactory + pytest_plugins = ("fixtures.zenith_fixtures") -def test_pgbench(pageserver, postgres, pg_bin, zenith_cli): - +def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli): # Create a branch for us zenith_cli.run(["branch", "test_pgbench", "empty"]) diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index dc3be94bfe..464d3ed3d7 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -1,4 +1,5 @@ from contextlib import closing +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") @@ -6,7 +7,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test restarting and recreating a postgres instance # -def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): +def test_restart_compute(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_restart_compute", "empty"]) pg = postgres.create_start('test_restart_compute') diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py new file mode 100644 index 0000000000..ee6bb0bfd3 --- /dev/null +++ b/test_runner/batch_others/test_tenants.py @@ -0,0 +1,48 @@ +from contextlib import closing + +import pytest + +from fixtures.zenith_fixtures import ( + TenantFactory, + ZenithCli, + PostgresFactory, +) + + +@pytest.mark.parametrize('with_wal_acceptors', [False, True]) +def test_tenants_normal_work( + zenith_cli: ZenithCli, + tenant_factory: TenantFactory, + postgres: PostgresFactory, + wa_factory, + with_wal_acceptors: bool, +): + """Tests tenants with and without wal acceptors""" + tenant_1 = tenant_factory.create() + tenant_2 = tenant_factory.create() + + zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_1}"]) + zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_2}"]) + if with_wal_acceptors: + wa_factory.start_n_new(3) + + pg_tenant1 = postgres.create_start( + f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", + tenant_1, + wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, + ) + pg_tenant2 = postgres.create_start( + f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", + tenant_2, + wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, + ) + + for pg in [pg_tenant1, pg_tenant2]: + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute("CREATE TABLE t(key int primary key, value text)") + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute("SELECT sum(key) FROM t") + assert cur.fetchone() == (5000050000,) diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index d4ffb9dc6c..ab1cdf3001 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,10 +1,13 @@ +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + + pytest_plugins = ("fixtures.zenith_fixtures") # # Test branching, when a transaction is in prepared state # -def test_twophase(zenith_cli, pageserver, postgres, pg_bin): +def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_twophase", "empty"]) pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) @@ -28,8 +31,10 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin): # Create a branch with the transaction in prepared state zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) - pg2 = postgres.create_start('test_twophase_prepared', - config_lines=['max_prepared_transactions=5']) + pg2 = postgres.create_start( + 'test_twophase_prepared', + config_lines=['max_prepared_transactions=5'], + ) conn2 = pg2.connect() cur2 = conn2.cursor() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index bf5a7f19e7..371a740f0f 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -4,13 +4,14 @@ import time from contextlib import closing from multiprocessing import Process, Value +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") # basic test, write something in setup with wal acceptors, ensure that commits # succeed and data is written -def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): +def test_normal_work(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"]) wa_factory.start_n_new(3) pg = postgres.create_start('test_wal_acceptors_normal_work', @@ -28,7 +29,7 @@ def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): # Run page server and multiple acceptors, and multiple compute nodes running # against different timelines. -def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory): +def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): n_timelines = 2 wa_factory.start_n_new(3) @@ -60,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory): # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up # along the way. 2 of 3 are always alive, so the work keeps going. -def test_restarts(zenith_cli, pageserver, postgres, wa_factory): +def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): fault_probability = 0.01 n_inserts = 1000 n_acceptors = 3 @@ -101,7 +102,7 @@ def delayed_wal_acceptor_start(wa): # When majority of acceptors is offline, commits are expected to be frozen -def test_unavailability(zenith_cli, pageserver, postgres, wa_factory): +def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory): wa_factory.start_n_new(2) zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"]) @@ -171,7 +172,7 @@ def stop_value(): # do inserts while concurrently getting up/down subsets of acceptors -def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_value): +def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory, stop_value): wa_factory.start_n_new(3) diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 987a38753a..be9e2b07fd 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -1,43 +1,50 @@ import json +import uuid + +from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") -def helper_compare_branch_list(page_server_cur, zenith_cli): +def helper_compare_branch_list(page_server_cur, zenith_cli, initial_tenant: str): """ Compare branches list returned by CLI and directly via API. Filters out branches created by other tests. """ - page_server_cur.execute('branch_list') + page_server_cur.execute(f'branch_list {initial_tenant}') branches_api = sorted(map(lambda b: b['name'], json.loads(page_server_cur.fetchone()[0]))) branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')] res = zenith_cli.run(["branch"]) - assert res.stderr == '' + res.check_returncode() branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] - assert branches_api == branches_cli + res = zenith_cli.run(["branch", f"--tenantid={initial_tenant}"]) + res.check_returncode() + branches_cli_with_tenant_arg = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) + branches_cli_with_tenant_arg = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] + + assert branches_api == branches_cli == branches_cli_with_tenant_arg -def test_cli_branch_list(pageserver, zenith_cli): - +def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli): page_server_conn = pageserver.connect() page_server_cur = page_server_conn.cursor() # Initial sanity check - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Create a branch for us res = zenith_cli.run(["branch", "test_cli_branch_list_main", "main"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Create a nested branch res = zenith_cli.run(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli) + helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) # Check that all new branches are visible via CLI res = zenith_cli.run(["branch"]) @@ -46,3 +53,45 @@ def test_cli_branch_list(pageserver, zenith_cli): assert 'test_cli_branch_list_main' in branches_cli assert 'test_cli_branch_list_nested' in branches_cli + +def helper_compare_tenant_list(page_server_cur, zenith_cli: ZenithCli): + page_server_cur.execute(f'tenant_list') + tenants_api = sorted(json.loads(page_server_cur.fetchone()[0])) + + res = zenith_cli.run(["tenant", "list"]) + assert res.stderr == '' + tenants_cli = sorted(res.stdout.splitlines()) + + assert tenants_api == tenants_cli + + +def test_cli_tenant_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli): + page_server_conn = pageserver.connect() + page_server_cur = page_server_conn.cursor() + + # Initial sanity check + helper_compare_tenant_list(page_server_cur, zenith_cli) + + # Create new tenant + tenant1 = uuid.uuid4().hex + res = zenith_cli.run(["tenant", "create", tenant1]) + res.check_returncode() + + # check tenant1 appeared + helper_compare_tenant_list(page_server_cur, zenith_cli) + + # Create new tenant + tenant2 = uuid.uuid4().hex + res = zenith_cli.run(["tenant", "create", tenant2]) + res.check_returncode() + + # check tenant2 appeared + helper_compare_tenant_list(page_server_cur, zenith_cli) + + res = zenith_cli.run(["tenant", "list"]) + res.check_returncode() + tenants = sorted(res.stdout.splitlines()) + + assert pageserver.initial_tenant in tenants + assert tenant1 in tenants + assert tenant2 in tenants diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index 1bd5aeb0b9..ae654401cc 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") -def test_isolation(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_isolation(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index d119fdaff9..e45f3c199c 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver pytest_plugins = ("fixtures.zenith_fixtures") -def test_pg_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_pg_regress(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index cf77cc41e5..ccda49295b 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -1,11 +1,12 @@ import os from fixtures.utils import mkdir_if_needed +from fixtures.zenith_fixtures import PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") -def test_zenith_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, +def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, base_dir, capsys): # Create a branch for us diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index b17992549a..154ca56b3e 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,5 +1,7 @@ import getpass import os +import pathlib +import uuid import psycopg2 import pytest import shutil @@ -169,16 +171,15 @@ class ZenithPageserver(PgProtocol): """ An object representing a running pageserver. """ def __init__(self, zenith_cli: ZenithCli): super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PORT) - self.zenith_cli = zenith_cli self.running = False + self.initial_tenant = None def init(self) -> 'ZenithPageserver': """ Initialize the repository, i.e. run "zenith init". Returns self. """ - self.zenith_cli.run(['init']) return self @@ -190,6 +191,8 @@ class ZenithPageserver(PgProtocol): self.zenith_cli.run(['start']) self.running = True + # get newly created tenant id + self.initial_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.strip() return self def stop(self) -> 'ZenithPageserver': @@ -232,7 +235,7 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int, tenant_id: str): super().__init__(host='localhost', port=55431 + instance_num) self.zenith_cli = zenith_cli @@ -240,12 +243,15 @@ class Postgres(PgProtocol): self.running = False self.repo_dir = repo_dir self.branch: Optional[str] = None # dubious, see asserts below - # path to conf is /pgdatadirs//postgresql.conf + self.tenant_id = tenant_id + # path to conf is /pgdatadirs/tenants///postgresql.conf - def create(self, - branch: str, - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> 'Postgres': + def create( + self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None, + ) -> 'Postgres': """ Create the pg data directory. If wal_acceptors is not None, node will use wal acceptors; config is @@ -256,7 +262,7 @@ class Postgres(PgProtocol): if not config_lines: config_lines = [] - self.zenith_cli.run(['pg', 'create', branch]) + self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}']) self.branch = branch if wal_acceptors is not None: self.adjust_for_wal_acceptors(wal_acceptors) @@ -273,14 +279,14 @@ class Postgres(PgProtocol): """ assert self.branch is not None - self.zenith_cli.run(['pg', 'start', self.branch]) + self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.running = True return self def config_file_path(self) -> str: """ Path to postgresql.conf """ - filename = f'pgdatadirs/{self.branch}/postgresql.conf' + filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf' return os.path.join(self.repo_dir, filename) def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': @@ -326,7 +332,7 @@ class Postgres(PgProtocol): if self.running: assert self.branch is not None - self.zenith_cli.run(['pg', 'stop', self.branch]) + self.zenith_cli.run(['pg', 'stop', self.branch, f'--tenantid={self.tenant_id}']) self.running = False return self @@ -338,42 +344,57 @@ class Postgres(PgProtocol): """ assert self.branch is not None - self.zenith_cli.run(['pg', 'stop', '--destroy', self.branch]) + assert self.tenant_id is not None + self.zenith_cli.run(['pg', 'stop', '--destroy', self.branch, f'--tenantid={self.tenant_id}']) return self - def create_start(self, - branch: str, - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> 'Postgres': + def create_start( + self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None, + ) -> 'Postgres': """ Create a Postgres instance, then start it. Returns self. """ - self.create(branch, wal_acceptors, config_lines).start() + self.create( + branch=branch, + wal_acceptors=wal_acceptors, + config_lines=config_lines, + ).start() return self class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] + self.initial_tenant: str = initial_tenant - def create_start(self, - branch: str = "main", - wal_acceptors: Optional[str] = None, - config_lines: Optional[List[str]] = None) -> Postgres: + def create_start( + self, + branch: str = "main", + tenant_id: Optional[str] = None, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None + ) -> Postgres: - pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1) + pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1, tenant_id=tenant_id or self.initial_tenant) self.num_instances += 1 self.instances.append(pg) - return pg.create_start(branch, wal_acceptors, config_lines) + return pg.create_start( + branch=branch, + wal_acceptors=wal_acceptors, + config_lines=config_lines, + ) def stop_all(self) -> 'PostgresFactory': for pg in self.instances: @@ -381,10 +402,14 @@ class PostgresFactory: return self +@zenfixture +def initial_tenant(pageserver: ZenithPageserver): + return pageserver.initial_tenant + @zenfixture -def postgres(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory(zenith_cli, repo_dir) +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]: + pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant) yield pgfactory @@ -636,3 +661,20 @@ def pg_distrib_dir(base_dir: str) -> str: if not os.path.exists(os.path.join(pg_dir, 'bin/postgres')): raise Exception('postgres not found at "{}"'.format(pg_dir)) return pg_dir + + +class TenantFactory: + def __init__(self, cli: ZenithCli): + self.cli = cli + + def create(self, tenant_id: Optional[str] = None): + if tenant_id is None: + tenant_id = uuid.uuid4().hex + res = self.cli.run(['tenant', 'create', tenant_id]) + res.check_returncode() + return tenant_id + + +@zenfixture +def tenant_factory(zenith_cli: ZenithCli): + return TenantFactory(zenith_cli) diff --git a/vendor/postgres b/vendor/postgres index 8ab674ad99..4cb9578650 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 8ab674ad9927095eeaf278fc67d6f8ad5e38c9cb +Subproject commit 4cb95786504e2d3cf8539ef901eb88c0615691d0 diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 15626bd360..f12b6263f5 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -21,7 +21,7 @@ use zenith_utils::lsn::Lsn; use crate::replication::HotStandbyFeedback; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use pageserver::ZTimelineId; +use pageserver::{ZTenantId, ZTimelineId}; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; use zenith_utils::pq_proto::SystemId; @@ -52,6 +52,7 @@ pub struct ServerInfo { pub wal_end: Lsn, pub timeline: TimeLineID, pub wal_seg_size: u32, + pub tenant_id: ZTenantId, } /// Vote request sent from proposer to safekeepers @@ -101,6 +102,7 @@ impl SafeKeeperInfo { wal_end: Lsn(0), timeline: 0, wal_seg_size: 0, + tenant_id: ZTenantId::from([0u8; 16]), }, commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ flush_lsn: Lsn(0), /* locally flushed part of WAL */ @@ -149,7 +151,7 @@ pub struct ReceiveWalConn { /// Periodically request pageserver to call back. /// If pageserver already has replication channel, it will just ignore this request /// -fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { +fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZTenantId) { let ps_addr = conf.pageserver_addr.unwrap(); let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr); @@ -158,9 +160,10 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { let me_conf: Config = me_connstr.parse().unwrap(); let (host, port) = connection_host_port(&me_conf); let callme = format!( - "callmemaybe {} host={} port={} options='-c ztimelineid={}'", - timelineid, host, port, timelineid + "callmemaybe {} {} host={} port={} options='-c ztimelineid={}'", + tenantid, timelineid, host, port, timelineid, ); + loop { info!( "requesting page server to connect to us: start {} {}", @@ -206,8 +209,8 @@ impl ReceiveWalConn { // Receive information about server let server_info = self.read_req::()?; info!( - "Start handshake with wal_proposer {} sysid {} timeline {}", - self.peer_addr, server_info.system_id, server_info.timeline_id, + "Start handshake with wal_proposer {} sysid {} timeline {} tenant {}", + self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id, ); // FIXME: also check that the system identifier matches self.timeline.set(server_info.timeline_id)?; @@ -274,14 +277,15 @@ impl ReceiveWalConn { // Add far as replication in postgres is initiated by receiver, we should use callme mechanism let conf = self.conf.clone(); let timelineid = self.timeline.get().timelineid; + let tenantid = server_info.tenant_id; thread::spawn(move || { - request_callback(conf, timelineid); + request_callback(conf, timelineid, tenantid); }); } info!( - "Start streaming from timeline {} address {:?}", - server_info.timeline_id, self.peer_addr, + "Start streaming from timeline {} tenant {} address {:?}", + server_info.timeline_id, server_info.tenant_id, self.peer_addr, ); // Main loop diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 413114ac1c..4c9c019893 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -4,6 +4,7 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env::{self, LocalEnv}; use control_plane::storage::PageServerNode; +use pageserver::ZTenantId; use std::collections::btree_map::Entry; use std::collections::HashMap; use std::process::exit; @@ -33,9 +34,15 @@ fn main() -> Result<()> { let timeline_arg = Arg::with_name("timeline") .short("n") .index(1) - .help("timeline name") + .help("Timeline name") .required(true); + let tenantid_arg = Arg::with_name("tenantid") + .long("tenantid") + .help("Tenant id. Represented as a hexadecimal string 32 symbols length") + .takes_value(true) + .required(false); + let matches = App::new("Zenith CLI") .setting(AppSettings::ArgRequiredElseHelp) .subcommand( @@ -52,7 +59,14 @@ fn main() -> Result<()> { SubCommand::with_name("branch") .about("Create a new branch") .arg(Arg::with_name("branchname").required(false).index(1)) - .arg(Arg::with_name("start-point").required(false).index(2)), + .arg(Arg::with_name("start-point").required(false).index(2)) + .arg(tenantid_arg.clone()), + ).subcommand( + SubCommand::with_name("tenant") + .setting(AppSettings::ArgRequiredElseHelp) + .about("Manage tenants") + .subcommand(SubCommand::with_name("list")) + .subcommand(SubCommand::with_name("create").arg(Arg::with_name("tenantid").required(false).index(1))) ) .subcommand(SubCommand::with_name("status")) .subcommand(SubCommand::with_name("start").about("Start local pageserver")) @@ -62,12 +76,13 @@ fn main() -> Result<()> { SubCommand::with_name("pg") .setting(AppSettings::ArgRequiredElseHelp) .about("Manage postgres instances") - .subcommand(SubCommand::with_name("list")) - .subcommand(SubCommand::with_name("create").arg(timeline_arg.clone())) - .subcommand(SubCommand::with_name("start").arg(timeline_arg.clone())) + .subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone())) + .subcommand(SubCommand::with_name("create").arg(timeline_arg.clone()).arg(tenantid_arg.clone())) + .subcommand(SubCommand::with_name("start").arg(timeline_arg.clone()).arg(tenantid_arg.clone())) .subcommand( SubCommand::with_name("stop") .arg(timeline_arg.clone()) + .arg(tenantid_arg.clone()) .arg( Arg::with_name("destroy") .help("Also delete data directory (now optional, should be default in future)") @@ -101,8 +116,10 @@ fn main() -> Result<()> { // Create config file if let ("init", Some(sub_args)) = matches.subcommand() { + let tenantid = ZTenantId::generate(); let pageserver_uri = sub_args.value_of("pageserver-url"); - local_env::init(pageserver_uri).with_context(|| "Failed to create config file")?; + local_env::init(pageserver_uri, tenantid) + .with_context(|| "Failed to create config file")?; } // all other commands would need config @@ -117,11 +134,17 @@ fn main() -> Result<()> { match matches.subcommand() { ("init", Some(_)) => { let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.init() { + if let Err(e) = pageserver.init(Some(&env.tenantid.to_string())) { eprintln!("pageserver init failed: {}", e); exit(1); } } + ("tenant", Some(args)) => { + if let Err(e) = handle_tenant(args, &env) { + eprintln!("tenant command failed: {}", e); + exit(1); + } + } ("branch", Some(sub_args)) => { if let Err(e) = handle_branch(sub_args, &env) { @@ -308,9 +331,12 @@ fn print_branch( /// Returns a map of timeline IDs to branch_name@lsn strings. /// Connects to the pageserver to query this information. -fn get_branch_infos(env: &local_env::LocalEnv) -> Result> { +fn get_branch_infos( + env: &local_env::LocalEnv, + tenantid: &ZTenantId, +) -> Result> { let page_server = PageServerNode::from_env(env); - let branch_infos: Vec = page_server.branches_list()?; + let branch_infos: Vec = page_server.branches_list(tenantid)?; let branch_infos: HashMap = branch_infos .into_iter() .map(|branch_info| (branch_info.timeline_id, branch_info)) @@ -319,23 +345,51 @@ fn get_branch_infos(env: &local_env::LocalEnv) -> Result Result<()> { + let pageserver = PageServerNode::from_env(&env); + match tenant_match.subcommand() { + ("list", Some(_)) => { + for tenant in pageserver.tenants_list()? { + println!("{}", tenant); + } + } + ("create", Some(create_match)) => { + let tenantid = match create_match.value_of("tenantid") { + Some(tenantid) => ZTenantId::from_str(tenantid)?, + None => ZTenantId::generate(), + }; + println!("using tenant id {}", tenantid); + pageserver.tenant_create(&tenantid)?; + println!("tenant successfully created on the pageserver"); + } + _ => {} + } + Ok(()) +} + fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(&env); if let Some(branchname) = branch_match.value_of("branchname") { - if let Some(startpoint_str) = branch_match.value_of("start-point") { - let branch = pageserver.branch_create(branchname, startpoint_str)?; - println!( - "Created branch '{}' at {:?}", - branch.name, - branch.latest_valid_lsn.unwrap_or(Lsn(0)) - ); - } else { - bail!("Missing start-point"); - } + let startpoint_str = branch_match + .value_of("start-point") + .ok_or(anyhow!("Missing start-point"))?; + let tenantid: ZTenantId = branch_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?; + println!( + "Created branch '{}' at {:?} for tenant: {}", + branch.name, + branch.latest_valid_lsn.unwrap_or(Lsn(0)), + tenantid, + ); } else { - // No arguments, list branches - let branches = pageserver.branches_list()?; + let tenantid: ZTenantId = branch_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + // No arguments, list branches for tenant + let branches = pageserver.branches_list(&tenantid)?; print_branches_tree(branches)?; } @@ -346,14 +400,21 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let mut cplane = ComputeControlPlane::load(env.clone())?; match pg_match.subcommand() { - ("list", Some(_sub_m)) => { - let branch_infos = get_branch_infos(env).unwrap_or_else(|e| { + ("list", Some(list_match)) => { + let tenantid: ZTenantId = list_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| { eprintln!("Failed to load branch info: {}", e); HashMap::new() }); println!("BRANCH\tADDRESS\t\tLSN\t\tSTATUS"); - for (timeline_name, node) in cplane.nodes.iter() { + for ((_, timeline_name), node) in cplane + .nodes + .iter() + .filter(|((node_tenantid, _), _)| node_tenantid == &tenantid) + { println!( "{}\t{}\t{}\t{}", timeline_name, @@ -368,30 +429,42 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { ); } } - ("create", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); - cplane.new_node(timeline_name)?; - } - ("start", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); + ("create", Some(create_match)) => { + let tenantid: ZTenantId = create_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let timeline_name = create_match.value_of("timeline").unwrap_or("main"); + // check is that timeline doesnt already exist + // this check here is because it - let node = cplane.nodes.get(timeline_name); + cplane.new_node(tenantid, timeline_name)?; + } + ("start", Some(start_match)) => { + let tenantid: ZTenantId = start_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; + let timeline_name = start_match.value_of("timeline").unwrap_or("main"); + + let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned())); println!("Starting postgres on timeline {}...", timeline_name); if let Some(node) = node { node.start()?; } else { - let node = cplane.new_node(timeline_name)?; + let node = cplane.new_node(tenantid, timeline_name)?; node.start()?; } } - ("stop", Some(sub_m)) => { - let timeline_name = sub_m.value_of("timeline").unwrap_or("main"); - let destroy = sub_m.is_present("destroy"); + ("stop", Some(stop_match)) => { + let timeline_name = stop_match.value_of("timeline").unwrap_or("main"); + let destroy = stop_match.is_present("destroy"); + let tenantid: ZTenantId = stop_match + .value_of("tenantid") + .map_or(Ok(env.tenantid), |value| value.parse())?; let node = cplane .nodes - .get(timeline_name) + .get(&(tenantid, timeline_name.to_owned())) .ok_or_else(|| anyhow!("postgres {} is not found", timeline_name))?; node.stop(destroy)?; }