From 347bd012b34a6d46aa584cebfaba7af092a511f6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 8 Dec 2023 10:09:19 +0000 Subject: [PATCH] neon_local: attachment service status, refactors --- control_plane/src/attachment_service.rs | 138 ++++++++++++-------- control_plane/src/bin/attachment_service.rs | 6 + control_plane/src/bin/neon_local.rs | 17 ++- 3 files changed, 106 insertions(+), 55 deletions(-) diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 5c18cd6e5c..65eed2980e 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -1,13 +1,15 @@ use crate::{background_process, local_env::LocalEnv}; use anyhow::anyhow; use camino::Utf8PathBuf; -use hyper::StatusCode; +use hyper::{Method, StatusCode}; use pageserver_api::{ models::{TenantCreateRequest, TimelineCreateRequest}, shard::TenantShardId, }; -use serde::{Deserialize, Serialize}; +use postgres_connection::parse_host_port; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{path::PathBuf, process::Child}; +use tracing::instrument; use utils::id::{NodeId, TenantId}; pub struct AttachmentService { @@ -110,24 +112,79 @@ impl AttachmentService { pub async fn start(&self) -> anyhow::Result { let path_str = self.path.to_string_lossy(); - background_process::start_process( + let result = background_process::start_process( COMMAND, &self.env.base_data_dir, &self.env.attachment_service_bin(), ["-l", &self.listen, "-p", &path_str], [], - background_process::InitialPidFile::Create(self.pid_file()), - // TODO: a real status check - || async move { anyhow::Ok(true) }, + background_process::InitialPidFile::Create(&self.pid_file()), + || match self.status() { + Ok(_) => Ok(true), + Err(_) => Ok(false), + }, ) - .await + .await; + + for ps_conf in &self.env.pageservers { + let (pg_host, pg_port) = + parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); + let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr) + .expect("Unable to parse listen_http_addr"); + self.node_register(NodeRegisterRequest { + node_id: ps_conf.id, + listen_pg_addr: pg_host.to_string(), + listen_pg_port: pg_port.unwrap_or(5432), + listen_http_addr: http_host.to_string(), + listen_http_port: http_port.unwrap_or(80), + })?; + } + + result } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { background_process::stop_process(immediate, COMMAND, &self.pid_file()) } + /// Simple HTTP request wrapper for calling into attachment service + fn dispatch( + &self, + method: hyper::Method, + path: String, + body: Option, + ) -> anyhow::Result + where + RQ: Serialize + Sized, + RS: DeserializeOwned + Sized, + { + let url = self + .env + .control_plane_api + .clone() + .unwrap() + .join(&path) + .unwrap(); + + let mut builder = self.client.request(method, url); + if let Some(body) = body { + builder = builder.json(&body) + } + + let response = builder.send()?; + if response.status() != StatusCode::OK { + return Err(anyhow!( + "Unexpected status {} on {}", + response.status(), + path + )); + } + + Ok(response.json()?) + } + /// Call into the attach_hook API, for use before handing out attachments to pageservers + #[instrument(skip(self))] pub async fn attach_hook( &self, tenant_shard_id: TenantShardId, @@ -155,12 +212,11 @@ impl AttachmentService { Ok(response.gen) } + #[instrument(skip(self))] pub async fn inspect( &self, tenant_shard_id: TenantShardId, ) -> anyhow::Result> { - use hyper::StatusCode; - let url = self .env .control_plane_api @@ -180,58 +236,36 @@ impl AttachmentService { Ok(response.attachment) } - pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result<()> { - let url = self - .env - .control_plane_api - .clone() - .unwrap() - .join("tenant") - .unwrap(); - - let response = self.client.post(url).json(&req).send()?; - if response.status() != StatusCode::OK { - return Err(anyhow!("Unexpected status {}", response.status())); - } - - Ok(()) + #[instrument(skip(self))] + pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result { + self.dispatch(Method::POST, "tenant".to_string(), Some(req)) } + #[instrument(skip(self))] pub fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result { - let url = self - .env - .control_plane_api - .clone() - .unwrap() - .join(&format!("tenant/{tenant_id}")) - .unwrap(); - - let response = self.client.get(url).send()?; - if response.status() != StatusCode::OK { - return Err(anyhow!("Unexpected status {}", response.status())); - } - - Ok(response.json()?) + self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None) } + #[instrument(skip_all, fields(node_id=%req.node_id))] + pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { + self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req)) + } + + #[instrument(skip(self))] + pub fn status(&self) -> anyhow::Result<()> { + self.dispatch::<(), ()>(Method::GET, "status".to_string(), None) + } + + #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))] pub fn tenant_timeline_create( &self, tenant_id: TenantId, req: TimelineCreateRequest, ) -> anyhow::Result<()> { - let url = self - .env - .control_plane_api - .clone() - .unwrap() - .join(&format!("tenant/{tenant_id}/timeline")) - .unwrap(); - - let response = self.client.post(url).json(&req).send()?; - if response.status() != StatusCode::OK { - return Err(anyhow!("Unexpected status {}", response.status())); - } - - Ok(()) + self.dispatch( + Method::POST, + format!("tenant/{tenant_id}/timeline"), + Some(req), + ) } } diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index a9e96f7efd..dadcf2f606 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -715,9 +715,15 @@ async fn handle_node_register(mut req: Request) -> Result, json_response(StatusCode::OK, ()) } +/// Status endpoint is just used for checking that our HTTP listener is up +async fn handle_status(_req: Request) -> Result, ApiError> { + json_response(StatusCode::OK, ()) +} + fn make_router(persistent_state: PersistentState) -> RouterBuilder { endpoint::make_router() .data(Arc::new(State::new(persistent_state))) + .get("/status", |r| request_span(r, handle_status)) .post("/re-attach", |r| request_span(r, handle_re_attach)) .post("/validate", |r| request_span(r, handle_validate)) .post("/attach-hook", |r| request_span(r, handle_attach_hook)) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 26ad910d18..af21fbaf80 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -18,7 +18,7 @@ use control_plane::{broker, local_env}; use pageserver_api::models::{ ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo, }; -use pageserver_api::shard::TenantShardId; +use pageserver_api::shard::{ShardCount, TenantShardId}; use pageserver_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, @@ -418,9 +418,16 @@ async fn handle_tenant( Some(("create", create_match)) => { let tenant_conf: HashMap<_, _> = create_match .get_many::("config") - .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) + .map(|vals: clap::parser::ValuesRef<'_, String>| { + vals.flat_map(|c| c.split_once(':')).collect() + }) .unwrap_or_default(); + let shard_count: u8 = create_match + .get_one::("shard-count") + .cloned() + .unwrap_or(1); + let tenant_conf = PageServerNode::parse_config(tenant_conf)?; // If tenant ID was not specified, generate one @@ -435,7 +442,10 @@ async fn handle_tenant( // type is used both in attachment service (for creating tenants) and in pageserver (for creating shards) new_tenant_id: TenantShardId::unsharded(tenant_id), generation: None, - shard_parameters: ShardParameters::default(), + shard_parameters: ShardParameters { + count: ShardCount(shard_count), + stripe_size: None, + }, config: tenant_conf, })?; println!("tenant {tenant_id} successfully created on the pageserver"); @@ -1392,6 +1402,7 @@ fn cli() -> Command { .arg(pg_version_arg.clone()) .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false) .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified")) + .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) ) .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true)) .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))