Update Zenith CLI config between runs

This commit is contained in:
Kirill Bulatov
2022-02-24 13:40:32 +03:00
committed by Kirill Bulatov
parent f49990ed43
commit 4d0f7fd1e4
40 changed files with 371 additions and 415 deletions

View File

@@ -48,7 +48,7 @@ jobs:
echo Python
python3 --version
poetry run python3 --version
echo Pipenv
echo Poetry
poetry --version
echo Pgbench
$PG_BIN/pgbench --version

View File

@@ -147,7 +147,7 @@ impl PostgresNode {
// Read a few options from the config file
let context = format!("in config file {}", cfg_path_str);
let port: u16 = conf.parse_field("port", &context)?;
let timelineid: ZTimelineId = conf.parse_field("zenith.zenith_timeline", &context)?;
let timeline_id: ZTimelineId = conf.parse_field("zenith.zenith_timeline", &context)?;
let tenant_id: ZTenantId = conf.parse_field("zenith.zenith_tenant", &context)?;
let uses_wal_proposer = conf.get("wal_acceptors").is_some();
@@ -162,7 +162,7 @@ impl PostgresNode {
env: env.clone(),
pageserver: Arc::clone(pageserver),
is_test: false,
timeline_id: timelineid,
timeline_id,
lsn: recovery_target_lsn,
tenant_id,
uses_wal_proposer,

View File

@@ -3,17 +3,16 @@
//! Now it also provides init method which acts like a stub for proper installation
//! script which will use local paths.
use anyhow::{bail, Context};
use anyhow::{bail, ensure, Context};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::fmt::Write;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId, ZTenantTimelineId};
use crate::safekeeper::SafekeeperNode;
@@ -24,7 +23,7 @@ use crate::safekeeper::SafekeeperNode;
// to 'zenith init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct LocalEnv {
// Base directory for all the nodes (the pageserver, safekeepers and
// compute nodes).
@@ -63,12 +62,10 @@ pub struct LocalEnv {
/// Every tenant has a first timeline created for it, currently the only one ancestor-less for this tenant.
/// It is used as a default timeline for branching, if no ancestor timeline is specified.
#[serde(default)]
// TODO kb this does not survive calls between invocations, so will have to persist it.
// Then it comes back to names again?
pub initial_timelines: HashMap<ZTenantId, ZTimelineId>,
pub branch_name_mappings: HashMap<String, ZTenantTimelineId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct PageServerConf {
// node id
@@ -96,7 +93,7 @@ impl Default for PageServerConf {
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct SafekeeperConf {
pub id: ZNodeId,
@@ -222,6 +219,39 @@ impl LocalEnv {
Ok(env)
}
pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
// Currently, the user first passes a config file with 'zenith init --config=<path>'
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .zenith/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
let mut conf_content = r#"# This file describes a locale deployment of the page server
# and safekeeeper node. It is read by the 'zenith' command-line
# utility.
"#
.to_string();
// Convert the LocalEnv to a toml file.
//
// This could be as simple as this:
//
// conf_content += &toml::to_string_pretty(env)?;
//
// But it results in a "values must be emitted before tables". I'm not sure
// why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
// Maybe rust reorders the fields to squeeze avoid padding or something?
// In any case, converting to toml::Value first, and serializing that, works.
// See https://github.com/alexcrichton/toml-rs/issues/142
conf_content += &toml::to_string_pretty(&toml::Value::try_from(self)?)?;
let target_config_path = base_path.join("config");
fs::write(&target_config_path, conf_content).with_context(|| {
format!(
"Failed to write config file into path '{}'",
target_config_path.display()
)
})
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
let private_key_path = if self.private_key_path.is_absolute() {
@@ -240,15 +270,15 @@ impl LocalEnv {
pub fn init(&mut self) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
if base_path == Path::new("") {
bail!("repository base path is missing");
}
if base_path.exists() {
bail!(
"directory '{}' already exists. Perhaps already initialized?",
base_path.to_str().unwrap()
);
}
ensure!(
base_path != Path::new(""),
"repository base path is missing"
);
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
fs::create_dir(&base_path)?;
@@ -300,36 +330,7 @@ impl LocalEnv {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
}
let mut conf_content = String::new();
// Currently, the user first passes a config file with 'zenith init --config=<path>'
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .zenith/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
write!(
&mut conf_content,
r#"# This file describes a locale deployment of the page server
# and safekeeeper node. It is read by the 'zenith' command-line
# utility.
"#
)?;
// Convert the LocalEnv to a toml file.
//
// This could be as simple as this:
//
// conf_content += &toml::to_string_pretty(env)?;
//
// But it results in a "values must be emitted before tables". I'm not sure
// why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
// Maybe rust reorders the fields to squeeze avoid padding or something?
// In any case, converting to toml::Value first, and serializing that, works.
// See https://github.com/alexcrichton/toml-rs/issues/142
conf_content += &toml::to_string_pretty(&toml::Value::try_from(&self)?)?;
fs::write(base_path.join("config"), conf_content)?;
Ok(())
self.persist_config(base_path)
}
}

View File

@@ -1,7 +1,5 @@
//!
//! Timeline management code
//!
// TODO: move all paths construction to conf impl
//
use anyhow::{bail, Context, Result};

View File

@@ -25,21 +25,24 @@ def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=management_token)
new_timeline_id = env.zenith_cli.create_branch('test_pageserver_auth',
tenant_id=env.initial_tenant)
# tenant can create branches
tenant_http_client.timeline_create(timeline_id=uuid4(),
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline)
ancestor_timeline_id=new_timeline_id)
# console can create branches for tenant
management_http_client.timeline_create(timeline_id=uuid4(),
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline)
ancestor_timeline_id=new_timeline_id)
# fail to create branch using token with different tenant_id
with pytest.raises(ZenithPageserverApiException,
match='Forbidden: Tenant id mismatch. Permission denied'):
invalid_tenant_http_client.timeline_create(timeline_id=uuid4(),
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline)
ancestor_timeline_id=new_timeline_id)
# create tenant using management token
management_http_client.tenant_create(uuid4())
@@ -59,9 +62,9 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start(branch, timeline_id=new_timeline_id)
branch = f'test_compute_auth_to_pageserver{with_wal_acceptors}'
env.zenith_cli.create_branch(branch)
pg = env.postgres.create_start(branch)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -95,7 +95,7 @@ def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Create a branch for us
env.zenith_cli.create_branch("test_backpressure", "main")
env.zenith_cli.create_branch('test_backpressure')
pg = env.postgres.create_start('test_backpressure',
config_lines=['max_replication_write_lag=30MB'])

View File

@@ -22,9 +22,8 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
# Branch at the point where only 100 rows were inserted
test_branch_behind_timeline_id = env.zenith_cli.branch_timeline()
pgmain = env.postgres.create_start('test_branch_behind',
timeline_id=test_branch_behind_timeline_id)
env.zenith_cli.create_branch('test_branch_behind')
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")
main_pg_conn = pgmain.connect()
@@ -60,8 +59,9 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
log.info(f'LSN after 200100 rows: {lsn_b}')
# Branch at the point where only 100 rows were inserted
test_branch_behind_hundred_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn=lsn_a)
env.zenith_cli.create_branch('test_branch_behind_hundred',
'test_branch_behind',
ancestor_start_lsn=lsn_a)
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute('''
@@ -76,13 +76,12 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
log.info(f'LSN after 400100 rows: {lsn_c}')
# Branch at the point where only 200100 rows were inserted
test_branch_behind_more_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn=lsn_b)
env.zenith_cli.create_branch('test_branch_behind_more',
'test_branch_behind',
ancestor_start_lsn=lsn_b)
pg_hundred = env.postgres.create_start("test_branch_behind_hundred",
timeline_id=test_branch_behind_hundred_timeline_id)
pg_more = env.postgres.create_start("test_branch_behind_more",
timeline_id=test_branch_behind_more_timeline_id)
pg_hundred = env.postgres.create_start('test_branch_behind_hundred')
pg_more = env.postgres.create_start('test_branch_behind_more')
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
@@ -103,23 +102,23 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# Check bad lsn's for branching
# branch at segment boundary
test_branch_segment_boundary_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn="0/3000000")
pg = env.postgres.create_start("test_branch_segment_boundary",
timeline_id=test_branch_segment_boundary_timeline_id)
env.zenith_cli.create_branch('test_branch_segment_boundary',
'test_branch_behind',
ancestor_start_lsn="0/3000000")
pg = env.postgres.create_start('test_branch_segment_boundary')
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
env.zenith_cli.branch_timeline(ancestor_timeline_id=env.initial_timeline,
ancestor_start_lsn="0/42")
env.zenith_cli.create_branch('test_branch_preinitdb', ancestor_start_lsn="0/42")
# branch at pre-ancestor lsn
with pytest.raises(Exception, match="less than timeline ancestor lsn"):
env.zenith_cli.branch_timeline(ancestor_timeline_id=test_branch_behind_timeline_id,
ancestor_start_lsn="0/42")
env.zenith_cli.create_branch('test_branch_preinitdb',
'test_branch_behind',
ancestor_start_lsn="0/42")
# check that we cannot create branch based on garbage collected data
with closing(env.pageserver.connect()) as psconn:
@@ -131,8 +130,9 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
with pytest.raises(Exception, match="invalid branch start lsn"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.zenith_cli.branch_timeline(ancestor_timeline_id=test_branch_behind_timeline_id,
ancestor_start_lsn=gced_lsn)
env.zenith_cli.create_branch('test_branch_create_fail',
'test_branch_behind',
ancestor_start_lsn=gced_lsn)
# check that after gc everything is still there
hundred_cur.execute('SELECT count(*) FROM foo')

View File

@@ -12,7 +12,7 @@ from fixtures.log_helper import log
#
def test_clog_truncate(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_clog_truncate_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch('test_clog_truncate', 'empty')
# set agressive autovacuum to make sure that truncation will happen
config = [
@@ -25,9 +25,7 @@ def test_clog_truncate(zenith_simple_env: ZenithEnv):
'autovacuum_freeze_max_age=100000'
]
pg = env.postgres.create_start('test_clog_truncate',
config_lines=config,
timeline_id=test_clog_truncate_timeline_id)
pg = env.postgres.create_start('test_clog_truncate', config_lines=config)
log.info('postgres is running on test_clog_truncate branch')
# Install extension containing function needed for test
@@ -64,11 +62,10 @@ def test_clog_truncate(zenith_simple_env: ZenithEnv):
# create new branch after clog truncation and start a compute node on it
log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}')
test_clog_truncate_new_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_clog_truncate_timeline_id,
ancestor_start_lsn=lsn_after_truncation)
pg2 = env.postgres.create_start('test_clog_truncate_new',
timeline_id=test_clog_truncate_new_timeline_id)
env.zenith_cli.create_branch('test_clog_truncate_new',
'test_clog_truncate',
ancestor_start_lsn=lsn_after_truncation)
pg2 = env.postgres.create_start('test_clog_truncate_new')
log.info('postgres is running on test_clog_truncate_new branch')
# check that new node doesn't contain truncated segment

View File

@@ -9,10 +9,10 @@ from fixtures.log_helper import log
#
def test_config(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_config',
config_lines=['log_min_messages=debug1'],
timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_config", "empty")
# change config
pg = env.postgres.create_start('test_config', config_lines=['log_min_messages=debug1'])
log.info('postgres is running on test_config branch')
with closing(pg.connect()) as conn:

View File

@@ -11,9 +11,9 @@ from fixtures.log_helper import log
#
def test_createdb(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_createdb_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch('test_createdb', 'empty')
pg = env.postgres.create_start('test_createdb', timeline_id=test_createdb_timeline_id)
pg = env.postgres.create_start('test_createdb')
log.info("postgres is running on 'test_createdb' branch")
with closing(pg.connect()) as conn:
@@ -27,9 +27,8 @@ def test_createdb(zenith_simple_env: ZenithEnv):
lsn = cur.fetchone()[0]
# Create a branch
test_createdb2_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_createdb_timeline_id, ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start('test_createdb2', timeline_id=test_createdb2_timeline_id)
env.zenith_cli.create_branch('test_createdb2', 'test_createdb', ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start('test_createdb2')
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
@@ -41,8 +40,8 @@ def test_createdb(zenith_simple_env: ZenithEnv):
#
def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
test_dropdb_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_dropdb', timeline_id=test_dropdb_timeline_id)
env.zenith_cli.create_branch('test_dropdb', 'empty')
pg = env.postgres.create_start('test_dropdb')
log.info("postgres is running on 'test_dropdb' branch")
with closing(pg.connect()) as conn:
@@ -65,15 +64,15 @@ def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir):
lsn_after_drop = cur.fetchone()[0]
# Create two branches before and after database drop.
test_before_dropdb_timeline_db = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_dropdb_timeline_id, ancestor_start_lsn=lsn_before_drop)
pg_before = env.postgres.create_start('test_before_dropdb',
timeline_id=test_before_dropdb_timeline_db)
env.zenith_cli.create_branch('test_before_dropdb',
'test_dropdb',
ancestor_start_lsn=lsn_before_drop)
pg_before = env.postgres.create_start('test_before_dropdb')
test_after_dropdb_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_dropdb_timeline_id, ancestor_start_lsn=lsn_after_drop)
pg_after = env.postgres.create_start('test_after_dropdb',
timeline_id=test_after_dropdb_timeline_id)
env.zenith_cli.create_branch('test_after_dropdb',
'test_dropdb',
ancestor_start_lsn=lsn_after_drop)
pg_after = env.postgres.create_start('test_after_dropdb')
# Test that database exists on the branch before drop
pg_before.connect(dbname='foodb').close()

View File

@@ -9,8 +9,8 @@ from fixtures.log_helper import log
#
def test_createuser(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_createuser_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_createuser', timeline_id=test_createuser_timeline_id)
env.zenith_cli.create_branch('test_createuser', 'empty')
pg = env.postgres.create_start('test_createuser')
log.info("postgres is running on 'test_createuser' branch")
with closing(pg.connect()) as conn:
@@ -24,9 +24,8 @@ def test_createuser(zenith_simple_env: ZenithEnv):
lsn = cur.fetchone()[0]
# Create a branch
test_createuser2_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_createuser_timeline_id, ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start('test_createuser2', timeline_id=test_createuser2_timeline_id)
env.zenith_cli.create_branch('test_createuser2', 'test_createuser', ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start('test_createuser2')
# Test that you can connect to new branch as a new user
assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )]

View File

@@ -1,6 +1,7 @@
from contextlib import closing
import asyncio
import asyncpg
import random
from fixtures.zenith_fixtures import ZenithEnv, Postgres, Safekeeper
@@ -54,8 +55,8 @@ async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str):
#
def test_gc_aggressive(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_gc_aggressive', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_gc_aggressive", "empty")
pg = env.postgres.create_start('test_gc_aggressive')
log.info('postgres is running on test_gc_aggressive branch')
conn = pg.connect()

View File

@@ -10,8 +10,8 @@ from fixtures.log_helper import log
#
def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
test_multixact_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_multixact', timeline_id=test_multixact_timeline_id)
env.zenith_cli.create_branch('test_multixact', 'empty')
pg = env.postgres.create_start('test_multixact')
log.info("postgres is running on 'test_multixact' branch")
pg_conn = pg.connect()
@@ -60,10 +60,8 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
test_multixact_new_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_multixact_timeline_id, ancestor_start_lsn=lsn)
pg_new = env.postgres.create_start('test_multixact_new',
timeline_id=test_multixact_new_timeline_id)
env.zenith_cli.create_branch('test_multixact_new', 'test_multixact', ancestor_start_lsn=lsn)
pg_new = env.postgres.create_start('test_multixact_new')
log.info("postgres is running on 'test_multixact_new' branch")
pg_new_conn = pg_new.connect()

View File

@@ -16,8 +16,8 @@ from fixtures.log_helper import log
#
def test_old_request_lsn(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_old_request_lsn', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_old_request_lsn", "empty")
pg = env.postgres.create_start('test_old_request_lsn')
log.info('postgres is running on test_old_request_lsn branch')
pg_conn = pg.connect()

View File

@@ -16,9 +16,8 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down',
timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_pageserver_catchup_while_compute_down')
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')
pg_conn = pg.connect()
cur = pg_conn.cursor()
@@ -60,8 +59,7 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil
env.safekeepers[2].start()
# restart compute node
pg.stop_and_destroy().create_start('test_pageserver_catchup_while_compute_down',
timeline_id=new_timeline_id)
pg.stop_and_destroy().create_start('test_pageserver_catchup_while_compute_down')
# Ensure that basebackup went correct and pageserver returned all data
pg_conn = pg.connect()

View File

@@ -15,8 +15,8 @@ def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_pageserver_restart', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_pageserver_restart')
pg = env.postgres.create_start('test_pageserver_restart')
pg_conn = pg.connect()
cur = pg_conn.cursor()

View File

@@ -35,8 +35,8 @@ async def parallel_load_same_table(pg: Postgres, n_parallel: int):
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(zenith_simple_env: ZenithEnv, n_parallel=5):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_parallel_copy', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_parallel_copy", "empty")
pg = env.postgres.create_start('test_parallel_copy')
log.info("postgres is running on 'test_parallel_copy' branch")
# Create test table

View File

@@ -4,8 +4,8 @@ from fixtures.log_helper import log
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_pgbench', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_pgbench", "empty")
pg = env.postgres.create_start('test_pgbench')
log.info("postgres is running on 'test_pgbench' branch")
connstr = pg.connstr()

View File

@@ -11,9 +11,8 @@ from fixtures.zenith_fixtures import ZenithEnv
#
def test_readonly_node(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_readonly_node_timeline_id = env.zenith_cli.branch_timeline()
pgmain = env.postgres.create_start('test_readonly_node',
timeline_id=test_readonly_node_timeline_id)
env.zenith_cli.create_branch('test_readonly_node', 'empty')
pgmain = env.postgres.create_start('test_readonly_node')
log.info("postgres is running on 'test_readonly_node' branch")
main_pg_conn = pgmain.connect()
@@ -53,14 +52,10 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
log.info('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
pg_hundred = env.postgres.create_start("test_readonly_node_hundred",
timeline_id=test_readonly_node_timeline_id,
lsn=lsn_a)
pg_hundred = env.postgres.create_start("test_readonly_node_hundred", lsn=lsn_a)
# And another at the point where 200100 rows were inserted
pg_more = env.postgres.create_start("test_readonly_node_more",
timeline_id=test_readonly_node_timeline_id,
lsn=lsn_b)
pg_more = env.postgres.create_start("test_readonly_node_more", lsn=lsn_b)
# On the 'hundred' node, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
@@ -79,9 +74,7 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
assert main_cur.fetchone() == (400100, )
# Check creating a node at segment boundary
pg = env.postgres.create_start("test_branch_segment_boundary",
timeline_id=test_readonly_node_timeline_id,
lsn='0/3000000')
pg = env.postgres.create_start("test_branch_segment_boundary", lsn='0/3000000')
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
@@ -89,6 +82,4 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
# Create node at pre-initdb lsn
with pytest.raises(Exception, match="invalid basebackup lsn"):
# compute node startup with invalid LSN should fail
env.zenith_cli.pg_start("test_readonly_node_preinitdb",
timeline_id=test_readonly_node_timeline_id,
lsn="0/42")
env.zenith_cli.pg_start("test_readonly_node_preinitdb", lsn="0/42")

View File

@@ -15,8 +15,8 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_restart_compute', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_restart_compute')
pg = env.postgres.create_start('test_restart_compute')
log.info("postgres is running on 'test_restart_compute' branch")
with closing(pg.connect()) as conn:
@@ -29,7 +29,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
log.info(f"res = {r}")
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id)
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -48,7 +48,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
log.info(f"res = {r}")
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id)
pg.stop_and_destroy().create_start('test_restart_compute')
# That select causes lots of FPI's and increases probability of wakeepers
# lagging behind after query completion
@@ -62,7 +62,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
log.info(f"res = {r}")
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id)
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -14,8 +14,8 @@ from fixtures.log_helper import log
#
def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_layerfiles_gc', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_layerfiles_gc", "empty")
pg = env.postgres.create_start('test_layerfiles_gc')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -10,8 +10,8 @@ from fixtures.log_helper import log
# CLOG.
def test_subxacts(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_subxacts', timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_subxacts", "empty")
pg = env.postgres.create_start('test_subxacts')
log.info("postgres is running on 'test_subxacts' branch")
pg_conn = pg.connect()

View File

@@ -127,14 +127,12 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
(tenant, _) = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
log.info("tenant to relocate %s", tenant)
new_timeline_id = env.zenith_cli.branch_timeline(tenant_id=tenant)
env.zenith_cli.create_branch('test_tenant_relocation', tenant_id=tenant)
tenant_pg = env.postgres.create_start("test_tenant_relocation",
tenant_id=tenant,
timeline_id=new_timeline_id)
tenant_pg = env.postgres.create_start("test_tenant_relocation", tenant_id=tenant)
# insert some data
with closing(tenant_pg.connect()) as conn:

View File

@@ -12,23 +12,21 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
env = zenith_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
(tenant_1, initial_timeline_1) = env.zenith_cli.create_tenant()
(tenant_2, initial_timeline_2) = env.zenith_cli.create_tenant()
tenant_1 = env.zenith_cli.create_tenant()
tenant_2 = env.zenith_cli.create_tenant()
new_timeline_tenant_1 = env.zenith_cli.branch_timeline(tenant_id=tenant_1,
ancestor_timeline_id=initial_timeline_1)
new_timeline_tenant_2 = env.zenith_cli.branch_timeline(tenant_id=tenant_2,
ancestor_timeline_id=initial_timeline_2)
env.zenith_cli.create_branch(f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
tenant_id=tenant_1)
env.zenith_cli.create_branch(f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
tenant_id=tenant_2)
pg_tenant1 = env.postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
tenant_id=tenant_1,
timeline_id=new_timeline_tenant_1,
)
pg_tenant2 = env.postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
tenant_id=tenant_2,
timeline_id=new_timeline_tenant_2,
)
for pg in [pg_tenant1, pg_tenant2]:

View File

@@ -10,14 +10,13 @@ import time
def test_timeline_size(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Branch at the point where only 100 rows were inserted
new_timeline_id = env.zenith_cli.branch_timeline()
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
print(f'@@@@@@@@@@\n{res}\n@@@@@@@@@@@')
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
pgmain = env.postgres.create_start("test_timeline_size", timeline_id=new_timeline_id)
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
@@ -69,7 +68,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size_quota')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
@@ -78,8 +77,7 @@ def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
pgmain = env.postgres.create_start(
"test_timeline_size_quota",
# Set small limit for the test
config_lines=['zenith.max_cluster_size=30MB'],
timeline_id=new_timeline_id)
config_lines=['zenith.max_cluster_size=30MB'])
log.info("postgres is running on 'test_timeline_size_quota' branch")
with closing(pgmain.connect()) as conn:

View File

@@ -9,10 +9,8 @@ from fixtures.log_helper import log
#
def test_twophase(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_twophase_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_twophase',
config_lines=['max_prepared_transactions=5'],
timeline_id=test_twophase_timeline_id)
env.zenith_cli.create_branch("test_twophase", "empty")
pg = env.postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
log.info("postgres is running on 'test_twophase' branch")
conn = pg.connect()
@@ -57,14 +55,12 @@ def test_twophase(zenith_simple_env: ZenithEnv):
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
test_twophase_prepared_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_twophase_timeline_id)
env.zenith_cli.create_branch("test_twophase_prepared", "test_twophase")
# Start compute on the new branch
pg2 = env.postgres.create_start(
'test_twophase_prepared',
config_lines=['max_prepared_transactions=5'],
timeline_id=test_twophase_prepared_timeline_id,
)
# Check that we restored only needed twophase files

View File

@@ -9,8 +9,8 @@ from fixtures.log_helper import log
def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
test_vm_bit_clear_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_vm_bit_clear', timeline_id=test_vm_bit_clear_timeline_id)
env.zenith_cli.create_branch("test_vm_bit_clear", "empty")
pg = env.postgres.create_start('test_vm_bit_clear')
log.info("postgres is running on 'test_vm_bit_clear' branch")
pg_conn = pg.connect()
@@ -33,8 +33,7 @@ def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
cur.execute('UPDATE vmtest_update SET id = 5000 WHERE id = 1')
# Branch at this point, to test that later
test_vm_bit_clear_new_timeline_id = env.zenith_cli.branch_timeline(
ancestor_timeline_id=test_vm_bit_clear_timeline_id)
env.zenith_cli.create_branch("test_vm_bit_clear_new", "test_vm_bit_clear")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server
@@ -62,8 +61,7 @@ def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
# earlier WAL record, the full-page image hides the problem. Starting a new
# server at the right point-in-time avoids that full-page image.
pg_new = env.postgres.create_start('test_vm_bit_clear_new',
timeline_id=test_vm_bit_clear_new_timeline_id)
pg_new = env.postgres.create_start('test_vm_bit_clear_new')
log.info("postgres is running on 'test_vm_bit_clear_new' branch")
pg_new_conn = pg_new.connect()

View File

@@ -24,8 +24,8 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_wal_acceptors_normal_work', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
pg = env.postgres.create_start('test_wal_acceptors_normal_work')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -62,8 +62,8 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
# start postgres on each timeline
pgs = []
for branch_name in branch_names:
new_timeline_id = env.zenith_cli.branch_timeline()
pgs.append(env.postgres.create_start(branch_name, timeline_id=new_timeline_id))
new_timeline_id = env.zenith_cli.create_branch(branch_name)
pgs.append(env.postgres.create_start(branch_name))
branch_names_to_timeline_ids[branch_name] = new_timeline_id
tenant_id = env.initial_tenant
@@ -87,7 +87,6 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
timeline_metrics = []
with env.pageserver.http_client() as pageserver_http:
for timeline_detail in timeline_details:
print(f"@@@@@@@@@@@\n{timeline_detail}\n@@@@@@@@@@@")
timeline_id: str = timeline_detail["timeline_id"]
m = TimelineMetrics(
@@ -188,8 +187,8 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_wal_acceptors_restarts', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_wal_acceptors_restarts')
pg = env.postgres.create_start('test_wal_acceptors_restarts')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -225,8 +224,8 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_wal_acceptors_unavailability', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_wal_acceptors_unavailability')
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -296,9 +295,8 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_wal_acceptors_race_conditions',
timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_wal_acceptors_race_conditions')
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -462,8 +460,8 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_timeline_status', timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_timeline_status')
pg = env.postgres.create_start('test_timeline_status')
wa = env.safekeepers[0]
wa_http_cli = wa.http_client()
@@ -636,12 +634,12 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 4
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch('test_replace_safekeeper')
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper', timeline_id=new_timeline_id)
pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start()
@@ -679,7 +677,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Recreate postgres to replace failed sk1 with new sk4")
pg.stop_and_destroy().create('test_replace_safekeeper', timeline_id=uuid.UUID(timeline_id))
pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))

View File

@@ -202,9 +202,8 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.branch_timeline()
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load',
timeline_id=new_timeline_id)
env.zenith_cli.create_branch('test_wal_acceptors_restarts_under_load')
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
asyncio.run(run_restarts_under_load(pg, env.safekeepers))

View File

@@ -36,11 +36,12 @@ def test_cli_timeline_list(zenith_simple_env: ZenithEnv):
helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant)
# Create a branch for us
main_timeline_id = env.zenith_cli.branch_timeline()
main_timeline_id = env.zenith_cli.create_branch('test_cli_branch_list_main')
helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant)
# Create a nested branch
nested_timeline_id = env.zenith_cli.branch_timeline(ancestor_timeline_id=main_timeline_id)
nested_timeline_id = env.zenith_cli.create_branch('test_cli_branch_list_nested',
'test_cli_branch_list_main')
helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant)
# Check that all new branches are visible via CLI
@@ -67,15 +68,13 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
helper_compare_tenant_list(pageserver_http_client, env)
# Create new tenant
tenant1 = uuid.uuid4()
env.zenith_cli.create_tenant(tenant_id=tenant1)
tenant1 = env.zenith_cli.create_tenant()
# check tenant1 appeared
helper_compare_tenant_list(pageserver_http_client, env)
# Create new tenant
tenant2 = uuid.uuid4()
env.zenith_cli.create_tenant(tenant_id=tenant2)
tenant2 = env.zenith_cli.create_tenant()
# check tenant2 appeared
helper_compare_tenant_list(pageserver_http_client, env)

View File

@@ -7,12 +7,10 @@ from fixtures.zenith_fixtures import ZenithEnv, base_dir, pg_distrib_dir
def test_isolation(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch("test_isolation", "empty")
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
pg = env.postgres.create_start('test_isolation',
config_lines=['max_prepared_transactions=100'],
timeline_id=new_timeline_id)
pg = env.postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100'])
pg.safe_psql('CREATE DATABASE isolation_regression')
# Create some local directories for pg_isolation_regress to run in.

View File

@@ -7,9 +7,9 @@ from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content,
def test_pg_regress(zenith_simple_env: ZenithEnv, test_output_dir: str, pg_bin, capsys):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
pg = env.postgres.create_start('test_pg_regress', timeline_id=new_timeline_id)
pg = env.postgres.create_start('test_pg_regress')
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.

View File

@@ -11,9 +11,9 @@ from fixtures.log_helper import log
def test_zenith_regress(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
env.zenith_cli.create_branch("test_zenith_regress", "empty")
# Connect to postgres and create a database called "regression".
pg = env.postgres.create_start('test_zenith_regress', timeline_id=new_timeline_id)
pg = env.postgres.create_start('test_zenith_regress')
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.

View File

@@ -64,8 +64,8 @@ class ZenithCompare(PgCompare):
self._pg_bin = pg_bin
# We only use one branch and one timeline
timeline_id = self.env.zenith_cli.branch_timeline()
self._pg = self.env.postgres.create_start("branch", timeline_id=timeline_id)
self.env.zenith_cli.create_branch(branch_name, 'empty')
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
# Long-lived cursor, useful for flushing

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass, field
from dataclasses import field
import textwrap
from cached_property import cached_property
import asyncpg
@@ -29,7 +29,6 @@ from dataclasses import dataclass
from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing_extensions import Literal
import pytest
import requests
import backoff # type: ignore
@@ -219,7 +218,7 @@ def can_bind(host: str, port: int) -> bool:
class PortDistributor:
def __init__(self, base_port: int, port_number: int) -> None:
def __init__(self, base_port: int, port_number: int):
self.iterator = iter(range(base_port, base_port + port_number))
def get_port(self) -> int:
@@ -424,7 +423,8 @@ class ZenithEnvBuilder:
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None):
rust_log_override: Optional[str] = None,
default_branch_name='main'):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
@@ -432,6 +432,7 @@ class ZenithEnvBuilder:
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[ZenithEnv] = None
self.s3_mock_server: Optional[MockS3Server] = None
@@ -536,7 +537,7 @@ class ZenithEnv:
initial_tenant - tenant ID of the initial tenant created in the repository
zenith_cli() - zenith_cli() can be used to run the 'zenith' CLI tool
zenith_cli - can be used to run the 'zenith' CLI tool
create_tenant() - initializes a new tenant in the page server, returns
the tenant id
@@ -546,9 +547,9 @@ class ZenithEnv:
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.s3_mock_server = config.s3_mock_server
self.default_branch_name = config.default_branch_name
self.zenith_cli = ZenithCli(env=self)
self.zenith_cli = ZenithCli(env=self)
self.postgres = PostgresFactory(self)
self.safekeepers: List[Safekeeper] = []
# generate initial tenant ID here instead of letting 'zenith init' generate it,
@@ -599,9 +600,7 @@ class ZenithEnv:
self.safekeepers.append(safekeeper)
log.info(f"Config: {toml}")
# TODO kb is this a wrong concept? will break for multiple tenant tests
self.initial_timeline = self.zenith_cli.init(toml)
self.postgres = PostgresFactory(self)
self.zenith_cli.init(toml)
def start(self):
# Start up the page server and all the safekeepers
@@ -637,7 +636,12 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
shutil.rmtree(repo_dir, ignore_errors=True)
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
yield builder.init_start()
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
env.zenith_cli.create_branch("empty")
yield env
@pytest.fixture(scope='function')
@@ -685,7 +689,7 @@ class ZenithPageserverApiException(Exception):
class ZenithPageserverHttpClient(requests.Session):
def __init__(self, port: int, auth_token: Optional[str] = None) -> None:
def __init__(self, port: int, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
@@ -804,59 +808,49 @@ class ZenithCli:
A typed wrapper around the `zenith` CLI tool.
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
"""
def __init__(self, env: ZenithEnv) -> None:
def __init__(self, env: ZenithEnv):
self.env = env
pass
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> tuple[uuid.UUID, uuid.UUID]:
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
"""
Creates a new tenant, returns its id and its initial timeline's id.
"""
if tenant_id is None:
tenant_id = uuid.uuid4()
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
initial_timeline_id_extractor = re.compile(r"initial timeline: '(?P<timeline_id>[^']+)'",
re.MULTILINE)
matches = initial_timeline_id_extractor.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group('timeline_id')
if created_timeline_id is None:
raise Exception('could not find timeline id after `zenith tenant create` invocation')
else:
return (tenant_id, uuid.UUID(created_timeline_id))
res.check_returncode()
return tenant_id
def list_tenants(self) -> 'subprocess.CompletedProcess[str]':
res = self.raw_cli(['tenant', 'list'])
res.check_returncode()
return res
def branch_timeline(self,
tenant_id: Optional[uuid.UUID] = None,
new_timeline_id: Optional[uuid.UUID] = None,
ancestor_timeline_id: Optional[uuid.UUID] = None,
ancestor_start_lsn: Optional[str] = None) -> uuid.UUID:
def create_branch(self,
new_branch_name: str,
ancestor_branch_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
ancestor_start_lsn: Optional[str] = None) -> uuid.UUID:
cmd = [
'timeline',
'branch',
'--name',
new_branch_name,
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
'--ancestor-timeline-id',
(ancestor_timeline_id or self.env.initial_timeline).hex,
'--ancestor-branch-name',
ancestor_branch_name or self.env.default_branch_name,
]
if ancestor_start_lsn is not None:
cmd.extend(['--ancestor-start-lsn', ancestor_start_lsn])
if new_timeline_id is not None:
cmd.extend(['--timeline-id', new_timeline_id.hex])
completed_process = self.raw_cli(cmd)
completed_process.check_returncode()
res = self.raw_cli(cmd)
res.check_returncode()
create_timeline_id_extractor = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
matches = create_timeline_id_extractor.search(completed_process.stdout)
matches = create_timeline_id_extractor.search(res.stdout)
created_timeline_id = None
if matches is not None:
@@ -875,9 +869,7 @@ class ZenithCli:
res.stdout.strip().split("\n")))
return branches_cli
def init(self, config_toml: str) -> uuid.UUID:
initial_timeline = None
def init(self, config_toml: str) -> 'subprocess.CompletedProcess[str]':
with tempfile.NamedTemporaryFile(mode='w+') as tmp:
tmp.write(config_toml)
tmp.flush()
@@ -887,18 +879,9 @@ class ZenithCli:
self.env.pageserver.remote_storage,
self.env.pageserver.config_override)
completed_process = self.raw_cli(cmd)
completed_process.check_returncode()
init_timeline_id_extractor = re.compile(
r'^created initial timeline (?P<timeline_id>[^\s]+)\s', re.MULTILINE)
matches = init_timeline_id_extractor.search(completed_process.stdout)
if matches is not None:
initial_timeline = matches.group('timeline_id')
if initial_timeline is None:
raise Exception('could not find timeline id after `zenith init` invocation')
else:
return uuid.UUID(initial_timeline)
res = self.raw_cli(cmd)
res.check_returncode()
return res
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start', *overrides]
@@ -930,9 +913,8 @@ class ZenithCli:
def pg_create(
self,
node_name: str,
branch_name: str,
tenant_id: Optional[uuid.UUID] = None,
timeline_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
port: Optional[int] = None,
) -> 'subprocess.CompletedProcess[str]':
@@ -940,22 +922,21 @@ class ZenithCli:
'pg',
'create',
'--tenant-id', (tenant_id or self.env.initial_tenant).hex,
'--timeline-id', (timeline_id or self.env.initial_timeline).hex
'--name',
branch_name
]
if lsn is not None:
args.append(f'--lsn={lsn}')
if port is not None:
args.append(f'--port={port}')
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_start(
self,
node_name: str,
branch_name: str,
tenant_id: Optional[uuid.UUID] = None,
timeline_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
port: Optional[int] = None,
) -> 'subprocess.CompletedProcess[str]':
@@ -964,14 +945,13 @@ class ZenithCli:
'start',
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
'--timeline-id',
(timeline_id or self.env.initial_timeline).hex,
'--name',
branch_name,
]
if lsn is not None:
args.append(f'--lsn={lsn}')
if port is not None:
args.append(f'--port={port}')
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
@@ -979,14 +959,19 @@ class ZenithCli:
def pg_stop(
self,
node_name: str,
branch_name: str,
tenant_id: Optional[uuid.UUID] = None,
destroy=False,
) -> 'subprocess.CompletedProcess[str]':
args = ['pg', 'stop', f'--tenant-id={(tenant_id or self.env.initial_tenant).hex}']
args = [
'pg',
'stop',
f'--tenant-id={(tenant_id or self.env.initial_tenant).hex}',
'--name',
branch_name
]
if destroy:
args.append('--destroy')
args.append(node_name)
return self.raw_cli(args)
@@ -1061,8 +1046,7 @@ class ZenithPageserver(PgProtocol):
env: ZenithEnv,
port: PageserverPort,
remote_storage: Optional[RemoteStorage] = None,
config_override: Optional[str] = None,
enable_auth=False):
config_override: Optional[str] = None):
super().__init__(host='localhost', port=port.pg, username='zenith_admin')
self.env = env
self.running = False
@@ -1150,7 +1134,7 @@ class PgBin:
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
def _fixpath(self, command: List[str]) -> None:
def _fixpath(self, command: List[str]):
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
@@ -1161,7 +1145,7 @@ class PgBin:
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
"""
Run one of the postgres binaries.
@@ -1211,18 +1195,18 @@ class VanillaPostgres(PgProtocol):
self.running = False
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
def configure(self, options: List[str]) -> None:
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
conf_file.writelines(options)
def start(self) -> None:
def start(self):
assert not self.running
self.running = True
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'start'])
def stop(self) -> None:
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'stop'])
@@ -1298,15 +1282,14 @@ class Postgres(PgProtocol):
self.env = env
self.running = False
self.node_name: Optional[str] = None # dubious, see asserts below
self.branch_name: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
def create(
self,
node_name: str,
timeline_id: uuid.UUID,
branch_name: str,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
@@ -1318,13 +1301,12 @@ class Postgres(PgProtocol):
if not config_lines:
config_lines = []
self.env.zenith_cli.pg_create(node_name,
timeline_id=timeline_id,
self.env.zenith_cli.pg_create(branch_name,
tenant_id=self.tenant_id,
lsn=lsn,
port=self.port)
self.node_name = node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.node_name
self.branch_name = branch_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.branch_name
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
@@ -1343,11 +1325,11 @@ class Postgres(PgProtocol):
Returns self.
"""
assert self.node_name is not None
assert self.branch_name is not None
log.info(f"Starting postgres node {self.node_name}")
log.info(f"Starting postgres node {self.branch_name}")
run_result = self.env.zenith_cli.pg_start(self.node_name,
run_result = self.env.zenith_cli.pg_start(self.branch_name,
tenant_id=self.tenant_id,
port=self.port)
self.running = True
@@ -1358,8 +1340,8 @@ class Postgres(PgProtocol):
def pg_data_dir_path(self) -> str:
""" Path to data directory """
assert self.node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.node_name
assert self.branch_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.branch_name
return os.path.join(self.env.repo_dir, path)
def pg_xact_dir_path(self) -> str:
@@ -1418,8 +1400,8 @@ class Postgres(PgProtocol):
"""
if self.running:
assert self.node_name is not None
self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id)
assert self.branch_name is not None
self.env.zenith_cli.pg_stop(self.branch_name, self.tenant_id)
self.running = False
return self
@@ -1430,16 +1412,15 @@ class Postgres(PgProtocol):
Returns self.
"""
assert self.node_name is not None
self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, True)
self.node_name = None
assert self.branch_name is not None
self.env.zenith_cli.pg_stop(self.branch_name, self.tenant_id, True)
self.branch_name = None
return self
def create_start(
self,
node_name: str,
timeline_id: uuid.UUID,
branch_name: str,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
@@ -1450,8 +1431,7 @@ class Postgres(PgProtocol):
"""
self.create(
node_name=node_name,
timeline_id=timeline_id,
branch_name=branch_name,
config_lines=config_lines,
lsn=lsn,
).start()
@@ -1473,9 +1453,8 @@ class PostgresFactory:
self.instances: List[Postgres] = []
def create_start(self,
node_name: str = "main",
branch_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
timeline_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
@@ -1488,16 +1467,14 @@ class PostgresFactory:
self.instances.append(pg)
return pg.create_start(
node_name=node_name,
timeline_id=timeline_id or self.env.initial_timeline,
branch_name=branch_name or self.env.default_branch_name,
config_lines=config_lines,
lsn=lsn,
)
def create(self,
node_name: str = "main",
branch_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
timeline_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
@@ -1511,8 +1488,7 @@ class PostgresFactory:
self.instances.append(pg)
return pg.create(
node_name=node_name,
timeline_id=timeline_id or self.env.initial_timeline,
branch_name=branch_name or self.env.default_branch_name,
lsn=lsn,
config_lines=config_lines,
)
@@ -1616,7 +1592,7 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
def __init__(self, port: int) -> None:
def __init__(self, port: int):
super().__init__()
self.port = port
@@ -1743,7 +1719,7 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos
pg.stop()
# Take a basebackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, f"{pg.node_name}_restored_datadir")
restored_dir_path = os.path.join(env.repo_dir, f"{pg.branch_name}_restored_datadir")
mkdir_if_needed(restored_dir_path)
pg_bin = PgBin(test_output_dir)

View File

@@ -30,18 +30,16 @@ def test_bulk_tenant_create(
for i in range(tenants_count):
start = timeit.default_timer()
(tenant, tenant_initial_timeline_id) = env.zenith_cli.create_tenant()
new_timeline_id = env.zenith_cli.branch_timeline(
tenant_id=tenant, ancestor_timeline_id=tenant_initial_timeline_id)
tenant = env.zenith_cli.create_tenant()
env.zenith_cli.create_branch(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_wal_acceptors == 'with_wa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}",
tenant,
timeline_id=new_timeline_id)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant)
end = timeit.default_timer()
time_slices.append(end - start)

View File

@@ -1,5 +1,6 @@
from io import BytesIO
import asyncio
import asyncpg
from fixtures.zenith_fixtures import ZenithEnv, Postgres, PgProtocol
from fixtures.log_helper import log
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker

View File

@@ -21,8 +21,8 @@ run_broken = pytest.mark.skipif(os.environ.get('RUN_BROKEN') is None,
def test_broken(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
new_timeline_id = env.zenith_cli.branch_timeline()
env.postgres.create_start("test_broken", timeline_id=new_timeline_id)
env.zenith_cli.create_branch("test_broken", "empty")
env.postgres.create_start("test_broken")
log.info('postgres is running')
log.info('THIS NEXT COMMAND WILL FAIL:')

View File

@@ -1,4 +1,4 @@
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use clap::{App, AppSettings, Arg, ArgMatches};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
@@ -19,7 +19,7 @@ use walkeeper::defaults::{
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId};
use zenith_utils::GIT_VERSION;
use pageserver::timelines::TimelineInfo;
@@ -27,6 +27,7 @@ use pageserver::timelines::TimelineInfo;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
fn default_conf() -> String {
format!(
@@ -57,7 +58,7 @@ http_port = {safekeeper_http_port}
/// Timelines tree element used as a value in the HashMap.
///
struct TimelineTreeEl {
/// `TimelineInfo` received from the `pageserver` via the `timeline_list` libpq API call.
/// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
pub info: TimelineInfo,
/// Holds all direct children of this timeline referenced using `timeline_id`.
pub children: BTreeSet<ZTimelineId>,
@@ -71,16 +72,15 @@ struct TimelineTreeEl {
// * Providing CLI api to the pageserver
// * TODO: export/import to/from usual postgres
fn main() -> Result<()> {
let pg_node_arg = Arg::new("node").help("Node name").required(true);
let branch_name_arg = Arg::new("name")
.long("name")
.short('n')
.takes_value(true)
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
let timeline_id_arg = Arg::new("timeline-id")
.long("timeline-id")
.help("Timeline id. Represented as a hexadecimal string 32 symbols length")
.takes_value(true)
.required(false);
let tenant_id_arg = Arg::new("tenant-id")
.long("tenant-id")
.help("Tenant id. Represented as a hexadecimal string 32 symbols length")
@@ -137,15 +137,15 @@ fn main() -> Result<()> {
.subcommand(App::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Id of the new timeline, optional. If not specified, it will be generated randomly"))
.arg(Arg::new("ancestor-timeline-id").long("ancestor-timeline-id").takes_value(true)
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline").required(false))
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name").takes_value(true)
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(true))
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn").takes_value(true)
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
.subcommand(App::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Id of the new timeline, optional. If not specified, it will be generated randomly")))
.arg(branch_name_arg.clone()))
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -189,8 +189,7 @@ fn main() -> Result<()> {
.subcommand(App::new("list").arg(tenant_id_arg.clone()))
.subcommand(App::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
@@ -202,14 +201,13 @@ fn main() -> Result<()> {
))
.subcommand(App::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone()))
.subcommand(
App::new("stop")
.arg(pg_node_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(
Arg::new("destroy")
@@ -242,24 +240,26 @@ fn main() -> Result<()> {
handle_init(sub_args)
} else {
// all other commands need an existing config
let mut env = match LocalEnv::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config: {}", e);
exit(1);
}
};
let mut env = LocalEnv::load_config().context("Error loading config")?;
let original_env = env.clone();
match sub_name {
let subcommand_result = match sub_name {
"tenant" => handle_tenant(sub_args, &mut env),
"timeline" => handle_timeline(sub_args, &env),
"timeline" => handle_timeline(sub_args, &mut env),
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
_ => bail!("unexpected subcommand {}", sub_name),
};
if subcommand_result.is_ok() && original_env != env {
eprintln!("Subcommand had changed the config, updating");
env.persist_config(&env.base_data_dir)?;
}
subcommand_result
};
if let Err(e) = subcmd_result {
eprintln!("command failed: {:#}", e);
@@ -423,21 +423,6 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R
}
}
fn get_timeline_id(
sub_match: &ArgMatches,
tenant_id: ZTenantId,
env: &local_env::LocalEnv,
) -> anyhow::Result<ZTimelineId> {
if let Some(timeline_id) = sub_match.value_of("timeline-id") {
Ok(ZTimelineId::from_str(timeline_id)
.context("Failed to parse timeline id from arguments")?)
} else if let Some(&initial_timeline_id) = env.initial_timelines.get(&tenant_id) {
Ok(initial_timeline_id)
} else {
bail!("No timeline id, specify one in the subcommand's arguments");
}
}
fn handle_init(init_match: &ArgMatches) -> Result<()> {
// Create config file
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
@@ -491,7 +476,10 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
};
println!("using tenant id {}", tenant_id);
let initial_timeline_id = pageserver.tenant_create(tenant_id)?;
env.initial_timelines.insert(tenant_id, initial_timeline_id);
env.branch_name_mappings.insert(
DEFAULT_BRANCH_NAME.to_owned(),
ZTenantTimelineId::new(tenant_id, initial_timeline_id),
);
println!(
"tenant {} successfully created on the pageserver, initial timeline: '{}'",
tenant_id, initial_timeline_id
@@ -503,7 +491,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
Ok(())
}
fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
match timeline_match.subcommand() {
@@ -514,18 +502,28 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
}
Some(("create", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
let timeline_id = get_timeline_id(create_match, tenant_id, env)
.unwrap_or_else(|_| ZTimelineId::generate());
let timeline = pageserver.timeline_create(tenant_id, timeline_id, None, None)?;
let new_timeline_id = ZTimelineId::generate();
let new_branch_name = create_match
.value_of("name")
.ok_or(anyhow!("No branch name provided"))?;
let timeline = pageserver.timeline_create(tenant_id, new_timeline_id, None, None)?;
let last_record_lsn = match timeline {
TimelineInfo::Local {
last_record_lsn, ..
} => last_record_lsn,
TimelineInfo::Remote { .. } => {
bail!("Timeline {} was created as remote, not local", timeline_id)
bail!(
"Timeline {} was created as remote, not local",
new_timeline_id
)
}
};
env.branch_name_mappings.insert(
new_branch_name.to_string(),
ZTenantTimelineId::new(tenant_id, new_timeline_id),
);
println!(
"Created timeline '{}' at Lsn {} for tenant: {}",
timeline.timeline_id(),
@@ -535,18 +533,22 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let timeline_id = get_timeline_id(branch_match, tenant_id, env)
.unwrap_or_else(|_| ZTimelineId::generate());
let ancestor_timeline_id = match branch_match
.value_of("ancestor-timeline-id")
.map(ZTimelineId::from_str)
.transpose()
.context("Failed to parse ancestor timeline id from the request")?
.or_else(|| env.initial_timelines.get(&tenant_id).copied())
{
Some(id) => id,
None => bail!("No ancestor timeline id provided"),
};
let new_timeline_id = ZTimelineId::generate();
let new_branch_name = branch_match
.value_of("name")
.ok_or(anyhow!("No branch name provided"))?;
let ancestor_branch_name = branch_match
.value_of("ancestor-branch-name")
.ok_or(anyhow!("No ancestor branch name provided"))?;
let ancestor_timeline_id = env
.branch_name_mappings
.get(ancestor_branch_name)
.ok_or(anyhow!(
"Found no timeline id for branch name '{}'",
ancestor_branch_name
))?
.timeline_id;
let start_lsn = branch_match
.value_of("ancestor-start-lsn")
.map(Lsn::from_str)
@@ -554,7 +556,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline = pageserver.timeline_create(
tenant_id,
timeline_id,
new_timeline_id,
start_lsn,
Some(ancestor_timeline_id),
)?;
@@ -563,16 +565,23 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
TimelineInfo::Local {
last_record_lsn, ..
} => last_record_lsn,
TimelineInfo::Remote { .. } => {
bail!("Timeline {} was created as remote, not local", timeline_id)
}
TimelineInfo::Remote { .. } => bail!(
"Timeline {} was created as remote, not local",
new_timeline_id
),
};
env.branch_name_mappings.insert(
new_branch_name.to_string(),
ZTenantTimelineId::new(tenant_id, new_timeline_id),
);
println!(
"Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'",
timeline.timeline_id(),
last_record_lsn,
tenant_id,
ancestor_timeline_id,
ancestor_branch_name,
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
@@ -592,6 +601,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// All subcommands take an optional --tenant-id option
let tenant_id = get_tenant_id(sub_args, env)?;
let node_name = sub_args.value_of("name").unwrap_or(DEFAULT_BRANCH_NAME);
match sub_name {
"list" => {
@@ -630,13 +640,16 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
"create" => {
let node_name = sub_args.value_of("node").unwrap_or("main");
let lsn = sub_args
.value_of("lsn")
.map(Lsn::from_str)
.transpose()
.context("Failed to parse Lsn from the request")?;
let timeline_id = get_timeline_id(sub_args, tenant_id, env)?;
let timeline_id = env
.branch_name_mappings
.get(node_name)
.ok_or(anyhow!("Found no timeline id for node name {}", node_name))?
.timeline_id;
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
@@ -645,8 +658,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?;
}
"start" => {
let node_name = sub_args.value_of("node").unwrap_or("main");
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
@@ -666,7 +677,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
println!("Starting existing postgres {}...", node_name);
node.start(&auth_token)?;
} else {
let timeline_id = get_timeline_id(sub_args, tenant_id, env)?;
let timeline_id = env
.branch_name_mappings
.get(node_name)
.ok_or(anyhow!("Found no timeline id for node name {}", node_name))?
.timeline_id;
let lsn = sub_args
.value_of("lsn")
.map(Lsn::from_str)
@@ -686,7 +701,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
"stop" => {
let node_name = sub_args.value_of("node").unwrap_or("main");
let destroy = sub_args.is_present("destroy");
let node = cplane

View File

@@ -317,7 +317,7 @@ zid_newtype!(ZTenantId);
mutual_from!(ZTenantId, HexZTenantId);
// A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTenantTimelineId {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,