diff --git a/Cargo.lock b/Cargo.lock index 55203b72fc..e3f87e6c31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1001,6 +1001,7 @@ dependencies = [ "comfy-table", "compute_api", "git-version", + "hex", "hyper", "nix 0.26.2", "once_cell", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 5ea99cb07e..d02486464a 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -10,6 +10,7 @@ clap.workspace = true comfy-table.workspace = true git-version.workspace = true nix.workspace = true +hex.workspace = true once_cell.workspace = true postgres.workspace = true hyper.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index d401518614..a5ff93cbdf 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -1,6 +1,8 @@ -use std::{path::PathBuf, process::Child}; - use crate::{background_process, local_env::LocalEnv}; +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, process::Child}; +use utils::id::{NodeId, TenantId}; pub struct AttachmentService { env: LocalEnv, @@ -10,6 +12,17 @@ pub struct AttachmentService { const COMMAND: &str = "attachment_service"; +#[derive(Serialize, Deserialize)] +pub struct AttachHookRequest { + pub tenant_id: TenantId, + pub pageserver_id: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct AttachHookResponse { + pub gen: Option, +} + impl AttachmentService { pub fn from_env(env: &LocalEnv) -> Self { let path = env.base_data_dir.join("attachments.json"); @@ -53,4 +66,38 @@ impl AttachmentService { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { background_process::stop_process(immediate, COMMAND, &self.pid_file()) } + + /// Call into the attach_hook API, for use before handing out attachments to pageservers + pub fn attach_hook( + &self, + tenant_id: TenantId, + pageserver_id: NodeId, + ) -> anyhow::Result> { + use hyper::StatusCode; + + let url = self + .env + .pageserver + .control_plane_api + .clone() + .unwrap() + .join("attach_hook") + .unwrap(); + let client = reqwest::blocking::ClientBuilder::new() + .build() + .expect("Failed to construct http client"); + + let request = AttachHookRequest { + tenant_id, + pageserver_id: Some(pageserver_id), + }; + + let response = client.post(url).json(&request).send()?; + if response.status() != StatusCode::OK { + return Err(anyhow!("Unexpected status {0}", response.status())); + } + + let response = response.json::()?; + Ok(response.gen) + } } diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index 103b721915..5958512018 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -6,6 +6,7 @@ /// use anyhow::anyhow; use clap::Parser; +use hex::FromHex; use hyper::StatusCode; use hyper::{Body, Request, Response}; use pageserver_api::control_api::*; @@ -25,6 +26,8 @@ use utils::{ tcp_listener, }; +use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse}; + #[derive(Parser)] #[command(author, version, about, long_about = None)] #[command(arg_required_else_help(true))] @@ -37,7 +40,7 @@ struct Cli { } // The persistent state of each Tenant -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] struct TenantState { // Currently attached pageserver pageserver: Option, @@ -47,9 +50,71 @@ struct TenantState { generation: u32, } +fn to_hex_map(input: &HashMap, serializer: S) -> Result +where + S: serde::Serializer, + V: Clone + Serialize, +{ + eprintln!("to_hex_map"); + let transformed = input + .iter() + .map(|(k, v)| (HexTenantId::new(k.clone()), v.clone())); + + transformed + .collect::>() + .serialize(serializer) +} + +fn from_hex_map<'de, D, V>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, + V: Deserialize<'de>, +{ + eprintln!("from_hex_map"); + let hex_map = HashMap::::deserialize(deserializer)?; + + Ok(hex_map.into_iter().map(|(k, v)| (k.take(), v)).collect()) +} + +#[derive(Eq, PartialEq, Clone, Hash)] +struct HexTenantId(TenantId); + +impl HexTenantId { + fn new(t: TenantId) -> Self { + Self(t) + } + + fn take(self) -> TenantId { + self.0 + } +} + +impl Serialize for HexTenantId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let hex = self.0.hex_encode(); + serializer.collect_str(&hex) + } +} + +impl<'de> Deserialize<'de> for HexTenantId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let string = String::deserialize(deserializer)?; + TenantId::from_hex(string) + .map(|t| HexTenantId::new(t)) + .map_err(|e| serde::de::Error::custom(format!("{e}"))) + } +} + // Top level state available to all HTTP handlers #[derive(Serialize, Deserialize)] struct PersistentState { + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] tenants: HashMap, #[serde(skip)] @@ -162,18 +227,6 @@ async fn handle_validate(mut req: Request) -> Result, ApiEr json_response(StatusCode::OK, response) } - -#[derive(Serialize, Deserialize)] -struct AttachHookRequest { - tenant_id: TenantId, - pageserver_id: Option, -} - -#[derive(Serialize, Deserialize)] -struct AttachHookResponse { - gen: Option, -} - /// Call into this before attaching a tenant to a pageserver, to acquire a generation number /// (in the real control plane this is unnecessary, because the same program is managing /// generation numbers and doing attachments). diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 7dc659cd6c..2976363fa7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -284,14 +284,6 @@ fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result> { .context("Failed to parse tenant id from the argument string") } -fn parse_generation(sub_match: &ArgMatches) -> anyhow::Result> { - sub_match - .get_one::("generation") - .map(|tenant_id| u32::from_str(tenant_id)) - .transpose() - .context("Failed to parse generation rom the argument string") -} - fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result> { sub_match .get_one::("timeline-id") @@ -356,15 +348,25 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an } } Some(("create", create_match)) => { - let initial_tenant_id = parse_tenant_id(create_match)?; - let generation = parse_generation(create_match)?; let tenant_conf: HashMap<_, _> = create_match .get_many::("config") .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) .unwrap_or_default(); - let new_tenant_id = - pageserver.tenant_create(initial_tenant_id, generation, tenant_conf)?; - println!("tenant {new_tenant_id} successfully created on the pageserver"); + + // If tenant ID was not specified, generate one + let tenant_id = parse_tenant_id(create_match)?.unwrap_or(TenantId::generate()); + + let generation = if env.pageserver.control_plane_api.is_some() { + // We must register the tenant with the attachment service, so + // that when the pageserver restarts, it will be re-attached. + let attachment_service = AttachmentService::from_env(env); + attachment_service.attach_hook(tenant_id, env.pageserver.id)? + } else { + None + }; + + pageserver.tenant_create(tenant_id, generation, tenant_conf)?; + println!("tenant {tenant_id} successfully created on the pageserver"); // Create an initial timeline for the new tenant let new_timeline_id = parse_timeline_id(create_match)?; @@ -374,7 +376,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .context("Failed to parse postgres version from the argument string")?; let timeline_info = pageserver.timeline_create( - new_tenant_id, + tenant_id, new_timeline_id, None, None, @@ -385,17 +387,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an env.register_branch_mapping( DEFAULT_BRANCH_NAME.to_string(), - new_tenant_id, + tenant_id, new_timeline_id, )?; println!( - "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}", + "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}", ); if create_match.get_flag("set-default") { - println!("Setting tenant {new_tenant_id} as a default one"); - env.default_tenant_id = Some(new_tenant_id); + println!("Setting tenant {tenant_id} as a default one"); + env.default_tenant_id = Some(tenant_id); } } Some(("set-default", set_default_match)) => { @@ -940,11 +942,14 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow broker::start_broker_process(env)?; - let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.start() { - eprintln!("attachment_service start failed: {:#}", e); - try_stop_all(env, true); - exit(1); + // Only start the attachment service if the pageserver is configured to need it + if env.pageserver.control_plane_api.is_some() { + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.start() { + eprintln!("attachment_service start failed: {:#}", e); + try_stop_all(env, true); + exit(1); + } } let pageserver = PageServerNode::from_env(env); @@ -1006,9 +1011,11 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { eprintln!("neon broker stop failed: {e:#}"); } - let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.stop(immediate) { - eprintln!("attachment service stop failed: {e:#}"); + if env.pageserver.control_plane_api.is_some() { + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.stop(immediate) { + eprintln!("attachment service stop failed: {e:#}"); + } } } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index c674b982c2..eecc2479ff 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -323,7 +323,7 @@ impl PageServerNode { pub fn tenant_create( &self, - new_tenant_id: Option, + new_tenant_id: TenantId, generation: Option, settings: HashMap<&str, &str>, ) -> anyhow::Result { @@ -390,9 +390,6 @@ impl PageServerNode { .context("Failed to parse 'gc_feedback' as bool")?, }; - // If tenant ID was not specified, generate one - let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate()); - let request = models::TenantCreateRequest { new_tenant_id, generation, diff --git a/libs/pageserver_api/src/control_api.rs b/libs/pageserver_api/src/control_api.rs index 3c1b1ffc3a..3ed7b03be5 100644 --- a/libs/pageserver_api/src/control_api.rs +++ b/libs/pageserver_api/src/control_api.rs @@ -39,3 +39,16 @@ pub struct ValidateResponseTenant { pub id: TenantId, pub valid: bool, } + +#[cfg(testing)] +#[derive(Serialize, Deserialize)] +pub struct AttachHookRequest { + tenant_id: TenantId, + pageserver_id: Option, +} + +#[cfg(testing)] +#[derive(Serialize, Deserialize)] +pub struct AttachHookResponse { + gen: Option, +} diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 2ce92ee914..b93146e3a0 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -50,7 +50,7 @@ impl Id { Id::from(tli_buf) } - fn hex_encode(&self) -> String { + pub fn hex_encode(&self) -> String { static HEX: &[u8] = b"0123456789abcdef"; let mut buf = vec![0u8; self.0.len() * 2]; @@ -133,6 +133,10 @@ macro_rules! id_newtype { pub const fn from_array(b: [u8; 16]) -> Self { $t(Id(b)) } + + pub fn hex_encode(&self) -> String { + self.0.hex_encode() + } } impl FromStr for $t {