control_plane: implement attach hook

This commit is contained in:
John Spray
2023-08-29 09:35:34 +01:00
parent 265d3b4352
commit 3eff65618d
8 changed files with 169 additions and 46 deletions

1
Cargo.lock generated
View File

@@ -1001,6 +1001,7 @@ dependencies = [
"comfy-table",
"compute_api",
"git-version",
"hex",
"hyper",
"nix 0.26.2",
"once_cell",

View File

@@ -10,6 +10,7 @@ clap.workspace = true
comfy-table.workspace = true
git-version.workspace = true
nix.workspace = true
hex.workspace = true
once_cell.workspace = true
postgres.workspace = true
hyper.workspace = true

View File

@@ -1,6 +1,8 @@
use std::{path::PathBuf, process::Child};
use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, process::Child};
use utils::id::{NodeId, TenantId};
pub struct AttachmentService {
env: LocalEnv,
@@ -10,6 +12,17 @@ pub struct AttachmentService {
const COMMAND: &str = "attachment_service";
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_id: TenantId,
pub pageserver_id: Option<NodeId>,
}
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
pub gen: Option<u32>,
}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
@@ -53,4 +66,38 @@ impl AttachmentService {
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
pub fn attach_hook(
&self,
tenant_id: TenantId,
pageserver_id: NodeId,
) -> anyhow::Result<Option<u32>> {
use hyper::StatusCode;
let url = self
.env
.pageserver
.control_plane_api
.clone()
.unwrap()
.join("attach_hook")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
let request = AttachHookRequest {
tenant_id,
pageserver_id: Some(pageserver_id),
};
let response = client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {0}", response.status()));
}
let response = response.json::<AttachHookResponse>()?;
Ok(response.gen)
}
}

View File

@@ -6,6 +6,7 @@
///
use anyhow::anyhow;
use clap::Parser;
use hex::FromHex;
use hyper::StatusCode;
use hyper::{Body, Request, Response};
use pageserver_api::control_api::*;
@@ -25,6 +26,8 @@ use utils::{
tcp_listener,
};
use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
@@ -37,7 +40,7 @@ struct Cli {
}
// The persistent state of each Tenant
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
struct TenantState {
// Currently attached pageserver
pageserver: Option<NodeId>,
@@ -47,9 +50,71 @@ struct TenantState {
generation: u32,
}
fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
V: Clone + Serialize,
{
eprintln!("to_hex_map");
let transformed = input
.iter()
.map(|(k, v)| (HexTenantId::new(k.clone()), v.clone()));
transformed
.collect::<HashMap<HexTenantId, V>>()
.serialize(serializer)
}
fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
where
D: serde::de::Deserializer<'de>,
V: Deserialize<'de>,
{
eprintln!("from_hex_map");
let hex_map = HashMap::<HexTenantId, V>::deserialize(deserializer)?;
Ok(hex_map.into_iter().map(|(k, v)| (k.take(), v)).collect())
}
#[derive(Eq, PartialEq, Clone, Hash)]
struct HexTenantId(TenantId);
impl HexTenantId {
fn new(t: TenantId) -> Self {
Self(t)
}
fn take(self) -> TenantId {
self.0
}
}
impl Serialize for HexTenantId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let hex = self.0.hex_encode();
serializer.collect_str(&hex)
}
}
impl<'de> Deserialize<'de> for HexTenantId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
TenantId::from_hex(string)
.map(|t| HexTenantId::new(t))
.map_err(|e| serde::de::Error::custom(format!("{e}")))
}
}
// Top level state available to all HTTP handlers
#[derive(Serialize, Deserialize)]
struct PersistentState {
#[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
tenants: HashMap<TenantId, TenantState>,
#[serde(skip)]
@@ -162,18 +227,6 @@ async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiEr
json_response(StatusCode::OK, response)
}
#[derive(Serialize, Deserialize)]
struct AttachHookRequest {
tenant_id: TenantId,
pageserver_id: Option<NodeId>,
}
#[derive(Serialize, Deserialize)]
struct AttachHookResponse {
gen: Option<u32>,
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).

View File

@@ -284,14 +284,6 @@ fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
.context("Failed to parse tenant id from the argument string")
}
fn parse_generation(sub_match: &ArgMatches) -> anyhow::Result<Option<u32>> {
sub_match
.get_one::<String>("generation")
.map(|tenant_id| u32::from_str(tenant_id))
.transpose()
.context("Failed to parse generation rom the argument string")
}
fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
sub_match
.get_one::<String>("timeline-id")
@@ -356,15 +348,25 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
}
}
Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?;
let generation = parse_generation(create_match)?;
let tenant_conf: HashMap<_, _> = create_match
.get_many::<String>("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id =
pageserver.tenant_create(initial_tenant_id, generation, tenant_conf)?;
println!("tenant {new_tenant_id} successfully created on the pageserver");
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or(TenantId::generate());
let generation = if env.pageserver.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, env.pageserver.id)?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
@@ -374,7 +376,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
new_tenant_id,
tenant_id,
new_timeline_id,
None,
None,
@@ -385,17 +387,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
tenant_id,
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
);
if create_match.get_flag("set-default") {
println!("Setting tenant {new_tenant_id} as a default one");
env.default_tenant_id = Some(new_tenant_id);
println!("Setting tenant {tenant_id} as a default one");
env.default_tenant_id = Some(tenant_id);
}
}
Some(("set-default", set_default_match)) => {
@@ -940,11 +942,14 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
broker::start_broker_process(env)?;
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start() {
eprintln!("attachment_service start failed: {:#}", e);
try_stop_all(env, true);
exit(1);
// Only start the attachment service if the pageserver is configured to need it
if env.pageserver.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start() {
eprintln!("attachment_service start failed: {:#}", e);
try_stop_all(env, true);
exit(1);
}
}
let pageserver = PageServerNode::from_env(env);
@@ -1006,9 +1011,11 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
eprintln!("neon broker stop failed: {e:#}");
}
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
eprintln!("attachment service stop failed: {e:#}");
if env.pageserver.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
eprintln!("attachment service stop failed: {e:#}");
}
}
}

View File

@@ -323,7 +323,7 @@ impl PageServerNode {
pub fn tenant_create(
&self,
new_tenant_id: Option<TenantId>,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
@@ -390,9 +390,6 @@ impl PageServerNode {
.context("Failed to parse 'gc_feedback' as bool")?,
};
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
let request = models::TenantCreateRequest {
new_tenant_id,
generation,

View File

@@ -39,3 +39,16 @@ pub struct ValidateResponseTenant {
pub id: TenantId,
pub valid: bool,
}
#[cfg(testing)]
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
tenant_id: TenantId,
pageserver_id: Option<NodeId>,
}
#[cfg(testing)]
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
gen: Option<u32>,
}

View File

@@ -50,7 +50,7 @@ impl Id {
Id::from(tli_buf)
}
fn hex_encode(&self) -> String {
pub fn hex_encode(&self) -> String {
static HEX: &[u8] = b"0123456789abcdef";
let mut buf = vec![0u8; self.0.len() * 2];
@@ -133,6 +133,10 @@ macro_rules! id_newtype {
pub const fn from_array(b: [u8; 16]) -> Self {
$t(Id(b))
}
pub fn hex_encode(&self) -> String {
self.0.hex_encode()
}
}
impl FromStr for $t {