mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
neon_local: create tenants with multiple shards
This commit is contained in:
@@ -15,9 +15,9 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::tenant_migration::migrate_tenant;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo};
|
||||
use pageserver_api::{
|
||||
shard, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
@@ -30,6 +30,7 @@ use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
|
||||
use utils::generation::Generation;
|
||||
use utils::{
|
||||
auth::{Claims, Scope},
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
@@ -376,6 +377,8 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
|
||||
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
match tenant_match.subcommand() {
|
||||
Some(("list", _)) => {
|
||||
// TODO: make command aware of multiple pageservers
|
||||
let pageserver = get_default_pageserver(env);
|
||||
for t in pageserver.tenant_list()? {
|
||||
println!("{} {:?}", t.id, t.state);
|
||||
}
|
||||
@@ -386,23 +389,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let shard_count = create_match.get_many::<u8>("shard-count").unwrap_or(1);
|
||||
let shard_count: u8 = create_match
|
||||
.get_one::<u8>("shard-count")
|
||||
.cloned()
|
||||
.unwrap_or(1);
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
// TODO: per-shard generations
|
||||
let generation = if env.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// We will create an initial timeline for the new tenant
|
||||
let new_timeline_id = parse_timeline_id(create_match)?;
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
|
||||
let pg_version = create_match
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
@@ -411,52 +408,63 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
// TODO: implement ability for one pageserver to hold multiple
|
||||
// shards for the same tenant. Until then, we must place each
|
||||
// shard on a different pageserver.
|
||||
assert!(env.pageservers.len() >= shard_count);
|
||||
assert!(env.pageservers.len() >= shard_count as usize);
|
||||
|
||||
for shard in 0..shard_count {
|
||||
let ps_conf = env.pageservers.get(shard as usize).unwrap();
|
||||
for shard_number in 0..shard_count {
|
||||
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
|
||||
// TODO: shard-aware tenant creation. Currently tenant creation on the
|
||||
// TODO: per-shard generations
|
||||
let generation = if env.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// TODO: shard-aware POST /v1/tenant. Currently tenant creation on the
|
||||
// pageserver is a no-op, but we shouldn't skip the command entirely.
|
||||
|
||||
// pageserver.location_conf...
|
||||
let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?;
|
||||
|
||||
// pageserver.tenant_create(
|
||||
// tenant_id,
|
||||
// shard_number,
|
||||
// shard_count,
|
||||
// generation,
|
||||
// tenant_conf,
|
||||
// )?;
|
||||
|
||||
todo!();
|
||||
let location_conf = LocationConfig {
|
||||
shard_count,
|
||||
shard_number,
|
||||
shard_stripe_size: 32000,
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: generation.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf,
|
||||
};
|
||||
pageserver.location_config(tenant_id, location_conf)?;
|
||||
println!(
|
||||
"tenant {tenant_id} successfully created on pageserver {}",
|
||||
pageserver.conf.id
|
||||
);
|
||||
}
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
for shard_number in 0..shard_count {
|
||||
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
pageserver.timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
Some(new_timeline_id),
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
)?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
)?;
|
||||
|
||||
println!(
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||
);
|
||||
}
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
)?;
|
||||
|
||||
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
@@ -475,6 +483,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
// TODO: make command aware of multiple pageservers
|
||||
let pageserver = get_default_pageserver(env);
|
||||
pageserver
|
||||
.tenant_config(tenant_id, tenant_conf)
|
||||
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
|
||||
@@ -1373,6 +1383,7 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
|
||||
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
)
|
||||
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
|
||||
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
|
||||
|
||||
@@ -338,15 +338,8 @@ impl PageServerNode {
|
||||
.json()?)
|
||||
}
|
||||
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let mut settings = settings.clone();
|
||||
|
||||
let config = models::TenantConfig {
|
||||
pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
|
||||
Ok(models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
@@ -405,8 +398,16 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let config = Self::build_config(settings.clone())?;
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
generation,
|
||||
|
||||
@@ -102,6 +102,9 @@ pub fn migrate_tenant(
|
||||
println!("🔁 Already attached to {origin_ps_id}, freshening...");
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = LocationConfig {
|
||||
shard_count: 0,
|
||||
shard_number: 0,
|
||||
shard_stripe_size: 0,
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
@@ -115,6 +118,9 @@ pub fn migrate_tenant(
|
||||
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
|
||||
|
||||
let stale_conf = LocationConfig {
|
||||
shard_count: 0,
|
||||
shard_number: 0,
|
||||
shard_stripe_size: 0,
|
||||
mode: LocationConfigMode::AttachedStale,
|
||||
generation: Some(Generation::new(*generation)),
|
||||
secondary_conf: None,
|
||||
@@ -127,6 +133,9 @@ pub fn migrate_tenant(
|
||||
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = LocationConfig {
|
||||
shard_count: 0,
|
||||
shard_number: 0,
|
||||
shard_stripe_size: 0,
|
||||
mode: LocationConfigMode::AttachedMulti,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
@@ -171,6 +180,9 @@ pub fn migrate_tenant(
|
||||
|
||||
// Downgrade to a secondary location
|
||||
let secondary_conf = LocationConfig {
|
||||
shard_count: 0,
|
||||
shard_number: 0,
|
||||
shard_stripe_size: 0,
|
||||
mode: LocationConfigMode::Secondary,
|
||||
generation: None,
|
||||
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
||||
@@ -189,6 +201,9 @@ pub fn migrate_tenant(
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = LocationConfig {
|
||||
shard_count: 0,
|
||||
shard_number: 0,
|
||||
shard_stripe_size: 0,
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
|
||||
Reference in New Issue
Block a user