Properly store the branch name mappings

This commit is contained in:
Kirill Bulatov
2022-02-24 23:28:30 +02:00
committed by Kirill Bulatov
parent c7569dce47
commit 7b5482bac0
10 changed files with 158 additions and 90 deletions

View File

@@ -37,7 +37,7 @@ impl ComputeControlPlane {
// pgdatadirs
// |- tenants
// | |- <tenant_id>
// | | |- <timeline_id>
// | | |- <node name>
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));

View File

@@ -12,7 +12,8 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId, ZTenantTimelineId};
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId};
use crate::safekeeper::SafekeeperNode;
@@ -62,7 +63,10 @@ pub struct LocalEnv {
/// Every tenant has a first timeline created for it, currently the only one ancestor-less for this tenant.
/// It is used as a default timeline for branching, if no ancestor timeline is specified.
#[serde(default)]
pub branch_name_mappings: HashMap<String, ZTenantTimelineId>,
// A `HashMap<String, HashMap<ZTenantId, ZTimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
branch_name_mappings: HashMap<String, Vec<(ZTenantId, ZTimelineId)>>,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -149,6 +153,30 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn register_branch_mapping(
&mut self,
branch_name: String,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) {
self.branch_name_mappings
.entry(branch_name)
.or_default()
.push((tenant_id, timeline_id));
}
pub fn get_branch_timeline_id(
&self,
branch_name: &str,
tenant_id: ZTenantId,
) -> Option<ZTimelineId> {
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
}
/// Create a LocalEnv from a config file.
///
/// Unlike 'load_config', this function fills in any defaults that are missing

View File

@@ -201,7 +201,7 @@ pub fn create_repo(
info!("created directory structure in {}", repo_dir.display());
// create a new timeline directory
let timeline_id = init_timeline_id.unwrap_or_else(|| ZTimelineId::generate());
let timeline_id = init_timeline_id.unwrap_or_else(ZTimelineId::generate);
let timelinedir = conf.timeline_path(&timeline_id, &tenant_id);
crashsafe_dir::create_dir(&timelinedir)?;

View File

@@ -52,10 +52,14 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
log.info('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
pg_hundred = env.postgres.create_start("test_readonly_node_hundred", lsn=lsn_a)
pg_hundred = env.postgres.create_start(branch_name='test_readonly_node',
node_name='test_readonly_node_hundred',
lsn=lsn_a)
# And another at the point where 200100 rows were inserted
pg_more = env.postgres.create_start("test_readonly_node_more", lsn=lsn_b)
pg_more = env.postgres.create_start(branch_name='test_readonly_node',
node_name='test_readonly_node_more',
lsn=lsn_b)
# On the 'hundred' node, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
@@ -74,7 +78,9 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
assert main_cur.fetchone() == (400100, )
# Check creating a node at segment boundary
pg = env.postgres.create_start("test_branch_segment_boundary", lsn='0/3000000')
pg = env.postgres.create_start(branch_name='test_readonly_node',
node_name='test_branch_segment_boundary',
lsn='0/3000000')
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
@@ -82,4 +88,6 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
# Create node at pre-initdb lsn
with pytest.raises(Exception, match="invalid basebackup lsn"):
# compute node startup with invalid LSN should fail
env.zenith_cli.pg_start("test_readonly_node_preinitdb", lsn="0/42")
env.postgres.create_start(branch_name='test_readonly_node',
node_name='test_readonly_node_preinitdb',
lsn='0/42')

View File

@@ -43,7 +43,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init_start()
pg = env.postgres.create_start()
pg = env.postgres.create_start('main')
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
@@ -94,7 +94,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
log.debug("still waiting")
time.sleep(1)
pg = env.postgres.create_start()
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')

View File

@@ -132,7 +132,9 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
env.zenith_cli.create_branch('test_tenant_relocation', tenant_id=tenant)
tenant_pg = env.postgres.create_start("test_tenant_relocation", tenant_id=tenant)
tenant_pg = env.postgres.create_start(branch_name='main',
node_name='test_tenant_relocation',
tenant_id=tenant)
# insert some data
with closing(tenant_pg.connect()) as conn:

View File

@@ -835,7 +835,7 @@ class ZenithCli:
cmd = [
'timeline',
'branch',
'--name',
'--branch-name',
new_branch_name,
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
@@ -918,6 +918,7 @@ class ZenithCli:
def pg_create(
self,
branch_name: str,
node_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
port: Optional[int] = None,
@@ -925,21 +926,25 @@ class ZenithCli:
args = [
'pg',
'create',
'--tenant-id', (tenant_id or self.env.initial_tenant).hex,
'--name',
branch_name
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
'--branch-name',
branch_name,
]
if lsn is not None:
args.append(f'--lsn={lsn}')
args.extend(['--lsn', lsn])
if port is not None:
args.append(f'--port={port}')
args.extend(['--port', str(port)])
if node_name is not None:
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_start(
self,
branch_name: str,
node_name: str,
tenant_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
port: Optional[int] = None,
@@ -949,13 +954,13 @@ class ZenithCli:
'start',
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
'--name',
branch_name,
]
if lsn is not None:
args.append(f'--lsn={lsn}')
if port is not None:
args.append(f'--port={port}')
if node_name is not None:
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
@@ -963,19 +968,20 @@ class ZenithCli:
def pg_stop(
self,
branch_name: str,
node_name: str,
tenant_id: Optional[uuid.UUID] = None,
destroy=False,
) -> 'subprocess.CompletedProcess[str]':
args = [
'pg',
'stop',
f'--tenant-id={(tenant_id or self.env.initial_tenant).hex}',
'--name',
branch_name
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
]
if destroy:
args.append('--destroy')
if node_name is not None:
args.append(node_name)
return self.raw_cli(args)
@@ -1286,14 +1292,15 @@ class Postgres(PgProtocol):
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious, see asserts below
self.node_name: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
def create(
self,
branch_name: str,
node_name: Optional[str] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
@@ -1305,12 +1312,13 @@ class Postgres(PgProtocol):
if not config_lines:
config_lines = []
self.node_name = node_name or f'{branch_name}_pg_node'
self.env.zenith_cli.pg_create(branch_name,
node_name=self.node_name,
tenant_id=self.tenant_id,
lsn=lsn,
port=self.port)
self.branch_name = branch_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.branch_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.node_name
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
@@ -1329,11 +1337,11 @@ class Postgres(PgProtocol):
Returns self.
"""
assert self.branch_name is not None
assert self.node_name is not None
log.info(f"Starting postgres node {self.branch_name}")
log.info(f"Starting postgres node {self.node_name}")
run_result = self.env.zenith_cli.pg_start(self.branch_name,
run_result = self.env.zenith_cli.pg_start(self.node_name,
tenant_id=self.tenant_id,
port=self.port)
self.running = True
@@ -1344,8 +1352,8 @@ class Postgres(PgProtocol):
def pg_data_dir_path(self) -> str:
""" Path to data directory """
assert self.branch_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.branch_name
assert self.node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.node_name
return os.path.join(self.env.repo_dir, path)
def pg_xact_dir_path(self) -> str:
@@ -1404,8 +1412,8 @@ class Postgres(PgProtocol):
"""
if self.running:
assert self.branch_name is not None
self.env.zenith_cli.pg_stop(self.branch_name, self.tenant_id)
assert self.node_name is not None
self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id)
self.running = False
return self
@@ -1416,15 +1424,16 @@ class Postgres(PgProtocol):
Returns self.
"""
assert self.branch_name is not None
self.env.zenith_cli.pg_stop(self.branch_name, self.tenant_id, True)
self.branch_name = None
assert self.node_name is not None
self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, True)
self.node_name = None
return self
def create_start(
self,
branch_name: str,
node_name: Optional[str] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
@@ -1436,6 +1445,7 @@ class Postgres(PgProtocol):
self.create(
branch_name=branch_name,
node_name=node_name,
config_lines=config_lines,
lsn=lsn,
).start()
@@ -1457,7 +1467,8 @@ class PostgresFactory:
self.instances: List[Postgres] = []
def create_start(self,
branch_name: Optional[str] = None,
branch_name: str,
node_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
@@ -1471,13 +1482,15 @@ class PostgresFactory:
self.instances.append(pg)
return pg.create_start(
branch_name=branch_name or self.env.default_branch_name,
branch_name=branch_name,
node_name=node_name,
config_lines=config_lines,
lsn=lsn,
)
def create(self,
branch_name: Optional[str] = None,
branch_name: str,
node_name: Optional[str] = None,
tenant_id: Optional[uuid.UUID] = None,
lsn: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
@@ -1492,7 +1505,8 @@ class PostgresFactory:
self.instances.append(pg)
return pg.create(
branch_name=branch_name or self.env.default_branch_name,
branch_name=branch_name,
node_name=node_name,
lsn=lsn,
config_lines=config_lines,
)
@@ -1713,6 +1727,7 @@ def list_files_to_compare(pgdata_dir: str):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Postgres):
# Get the timeline ID. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -1723,7 +1738,7 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos
pg.stop()
# Take a basebackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, f"{pg.branch_name}_restored_datadir")
restored_dir_path = os.path.join(env.repo_dir, f"{pg.node_name}_restored_datadir")
mkdir_if_needed(restored_dir_path)
pg_bin = PgBin(test_output_dir)

View File

@@ -39,7 +39,7 @@ def test_bulk_tenant_create(
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
end = timeit.default_timer()
time_slices.append(end - start)

View File

@@ -19,7 +19,7 @@ use walkeeper::defaults::{
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::GIT_VERSION;
use pageserver::timelines::TimelineInfo;
@@ -72,13 +72,17 @@ struct TimelineTreeEl {
// * Providing CLI api to the pageserver
// * TODO: export/import to/from usual postgres
fn main() -> Result<()> {
let branch_name_arg = Arg::new("name")
.long("name")
.short('n')
let branch_name_arg = Arg::new("branch-name")
.long("branch-name")
.takes_value(true)
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let safekeeper_node_arg = Arg::new("node")
.help("Safekeeper node name")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
let tenant_id_arg = Arg::new("tenant-id")
@@ -199,6 +203,7 @@ fn main() -> Result<()> {
.subcommand(App::new("list").arg(tenant_id_arg.clone()))
.subcommand(App::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
@@ -211,20 +216,20 @@ fn main() -> Result<()> {
))
.subcommand(App::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(branch_name_arg.clone())
.arg(pg_node_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone()))
.subcommand(
App::new("stop")
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(
Arg::new("destroy")
.help("Also delete data directory (now optional, should be default in future)")
.long("destroy")
.required(false)
)
.arg(pg_node_arg.clone())
.arg(tenant_id_arg.clone())
.arg(
Arg::new("destroy")
.help("Also delete data directory (now optional, should be default in future)")
.long("destroy")
.required(false)
)
)
)
@@ -483,9 +488,10 @@ fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
exit(1);
});
env.branch_name_mappings.insert(
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_owned(),
ZTenantTimelineId::new(initial_tenant_id, initial_timeline_id),
initial_tenant_id,
initial_timeline_id,
);
Ok(env)
@@ -508,14 +514,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
}
}
Some(("create", create_match)) => {
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(|| ZTenantId::generate());
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(ZTenantId::generate);
println!("using tenant id {}", tenant_id);
let initial_timeline_id_argument = parse_timeline_id(create_match)?;
let initial_timeline_id =
pageserver.tenant_create(tenant_id, initial_timeline_id_argument)?;
env.branch_name_mappings.insert(
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_owned(),
ZTenantTimelineId::new(tenant_id, initial_timeline_id),
tenant_id,
initial_timeline_id,
);
println!(
"tenant {} successfully created on the pageserver, initial timeline: '{}'",
@@ -541,7 +548,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let tenant_id = get_tenant_id(create_match, env)?;
let new_timeline_id = ZTimelineId::generate();
let new_branch_name = create_match
.value_of("name")
.value_of("branch-name")
.ok_or(anyhow!("No branch name provided"))?;
let timeline = pageserver.timeline_create(tenant_id, new_timeline_id, None, None)?;
@@ -556,10 +563,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
)
}
};
env.branch_name_mappings.insert(
new_branch_name.to_string(),
ZTenantTimelineId::new(tenant_id, new_timeline_id),
);
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id);
println!(
"Created timeline '{}' at Lsn {} for tenant: {}",
@@ -572,19 +576,19 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id = ZTimelineId::generate();
let new_branch_name = branch_match
.value_of("name")
.value_of("branch-name")
.ok_or(anyhow!("No branch name provided"))?;
let ancestor_branch_name = branch_match
.value_of("ancestor-branch-name")
.ok_or(anyhow!("No ancestor branch name provided"))?;
let ancestor_timeline_id = env
.branch_name_mappings
.get(ancestor_branch_name)
.ok_or(anyhow!(
"Found no timeline id for branch name '{}'",
ancestor_branch_name
))?
.timeline_id;
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.ok_or_else(|| {
anyhow!(
"Found no timeline id for branch name '{}'",
ancestor_branch_name
)
})?;
let start_lsn = branch_match
.value_of("ancestor-start-lsn")
@@ -608,10 +612,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
),
};
env.branch_name_mappings.insert(
new_branch_name.to_string(),
ZTenantTimelineId::new(tenant_id, new_timeline_id),
);
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id);
println!(
"Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'",
@@ -638,7 +639,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// All subcommands take an optional --tenant-id option
let tenant_id = get_tenant_id(sub_args, env)?;
let node_name = sub_args.value_of("name").unwrap_or(DEFAULT_BRANCH_NAME);
match sub_name {
"list" => {
@@ -677,28 +677,37 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
"create" => {
let branch_name = sub_args
.value_of("branch-name")
.unwrap_or(DEFAULT_BRANCH_NAME);
let node_name = sub_args
.value_of("node")
.map(ToString::to_string)
.unwrap_or_else(|| format!("{}_node", branch_name));
let lsn = sub_args
.value_of("lsn")
.map(Lsn::from_str)
.transpose()
.context("Failed to parse Lsn from the request")?;
let timeline_id = env
.branch_name_mappings
.get(node_name)
.ok_or(anyhow!("Found no timeline id for node name {}", node_name))?
.timeline_id;
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{}'", branch_name))?;
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
};
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port)?;
}
"start" => {
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
};
let node_name = sub_args
.value_of("node")
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
let node = cplane.nodes.get(&(tenant_id, node_name.to_owned()));
@@ -714,11 +723,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
println!("Starting existing postgres {}...", node_name);
node.start(&auth_token)?;
} else {
let branch_name = sub_args
.value_of("branch-name")
.unwrap_or(DEFAULT_BRANCH_NAME);
let timeline_id = env
.branch_name_mappings
.get(node_name)
.ok_or(anyhow!("Found no timeline id for node name {}", node_name))?
.timeline_id;
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{}'", branch_name)
})?;
let lsn = sub_args
.value_of("lsn")
.map(Lsn::from_str)
@@ -738,6 +750,9 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
"stop" => {
let node_name = sub_args
.value_of("node")
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
let destroy = sub_args.is_present("destroy");
let node = cplane

View File

@@ -317,7 +317,7 @@ zid_newtype!(ZTenantId);
mutual_from!(ZTenantId, HexZTenantId);
// A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct ZTenantTimelineId {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,